eventmachine/eventmachine

View on GitHub
lib/em/protocols/postgres3.rb

Summary

Maintainability
A
3 hrs
Test Coverage
#--
#
# Author:: Francis Cianfrocca (gmail: blackhedd)
# Homepage::  http://rubyeventmachine.com
# Date:: 15 November 2006
# 
# See EventMachine and EventMachine::Connection for documentation and
# usage examples.
#
#----------------------------------------------------------------------------
#
# Copyright (C) 2006-08 by Francis Cianfrocca. All Rights Reserved.
# Gmail: blackhedd
# 
# This program is free software; you can redistribute it and/or modify
# it under the terms of either: 1) the GNU General Public License
# as published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version; or 2) Ruby's License.
# 
# See the file COPYING for complete licensing information.
#
#---------------------------------------------------------------------------
#
# 
# 

require 'postgres-pr/message'
require 'postgres-pr/connection'
require 'stringio'

# @private
class StringIO
  # Reads exactly +n+ bytes.
  #
  # If the data read is nil an EOFError is raised.
  #
  # If the data read is too short an IOError is raised
  def readbytes(n)
    str = read(n)
    if str == nil
      raise EOFError, "End of file reached"
    end
    if str.size < n
      raise IOError, "data truncated"
    end
    str
  end
  alias read_exactly_n_bytes readbytes
end


module EventMachine
  module Protocols
    # PROVISIONAL IMPLEMENTATION of an evented Postgres client.
    # This implements version 3 of the Postgres wire protocol, which will work
    # with any Postgres version from roughly 7.4 onward.
    #
    # Objective: we want to access Postgres databases without requiring threads.
    # Until now this has been a problem because the Postgres client implementations
    # have all made use of blocking I/O calls, which is incompatible with a
    # thread-free evented model.
    #
    # But rather than re-implement the Postgres Wire3 protocol, we're taking advantage
    # of the existing postgres-pr library, which was originally written by Michael
    # Neumann but (at this writing) appears to be no longer maintained. Still, it's
    # in basically a production-ready state, and the wire protocol isn't that complicated
    # anyway.
    #
    # We're tucking in a bunch of require statements that may not be present in garden-variety
    # EM installations. Until we find a good way to only require these if a program
    # requires postgres, this file will need to be required explicitly.
    #
    # We need to monkeypatch StringIO because it lacks the #readbytes method needed
    # by postgres-pr.
    # The StringIO monkeypatch is lifted from the standard library readbytes.rb,
    # which adds method #readbytes directly to class IO. But StringIO is not a subclass of IO.
    # It is modified to raise an IOError instead of TruncatedDataException since the exception is unused.
    #
    # We cloned the handling of postgres messages from lib/postgres-pr/connection.rb
    # in the postgres-pr library, and modified it for event-handling.
    #
    # TODO: The password handling in dispatch_conn_message is totally incomplete.
    #
    #
    # We return Deferrables from the user-level operations surfaced by this interface.
    # Experimentally, we're using the pattern of always returning a boolean value as the
    # first argument of a deferrable callback to indicate success or failure. This is
    # instead of the traditional pattern of calling Deferrable#succeed or #fail, and
    # requiring the user to define both a callback and an errback function.
    #
    # === Usage
    #  EM.run {
    #    db = EM.connect_unix_domain( "/tmp/.s.PGSQL.5432", EM::P::Postgres3 )
    #    db.connect( dbname, username, psw ).callback do |status|
    #      if status
    #        db.query( "select * from some_table" ).callback do |status, result, errors|
    #          if status
    #            result.rows.each do |row|
    #              p row
    #            end
    #          end
    #        end
    #      end
    #    end
    #  }
    class Postgres3 < EventMachine::Connection
      include PostgresPR

      def initialize
        @data = ""
        @params = {}
      end

      def connect db, user, psw=nil
        d = EM::DefaultDeferrable.new
        d.timeout 15

        if @pending_query || @pending_conn
          d.succeed false, "Operation already in progress"
        else
          @pending_conn = d
          prms = {"user"=>user, "database"=>db}
          @user = user
          if psw
            @password = psw
            #prms["password"] = psw
          end
          send_data PostgresPR::StartupMessage.new( 3 << 16, prms ).dump
        end

        d
      end

      def query sql
        d = EM::DefaultDeferrable.new
        d.timeout 15

        if @pending_query || @pending_conn
          d.succeed false, "Operation already in progress"
        else
          @r = PostgresPR::Connection::Result.new
          @e = []
          @pending_query = d
          send_data PostgresPR::Query.dump(sql)
        end

        d
      end


      def receive_data data
        @data << data
        while @data.length >= 5
          pktlen = @data[1...5].unpack("N").first
          if @data.length >= (1 + pktlen)
            pkt = @data.slice!(0...(1+pktlen))
            m = StringIO.open( pkt, "r" ) {|io| PostgresPR::Message.read( io ) }
            if @pending_conn
              dispatch_conn_message m
            elsif @pending_query
              dispatch_query_message m
            else
              raise "Unexpected message from database"
            end
          else
            break # very important, break out of the while
          end
        end
      end


      def unbind
        if o = (@pending_query || @pending_conn)
          o.succeed false, "lost connection"
        end
      end

      # Cloned and modified from the postgres-pr.
      def dispatch_conn_message msg
        case msg
        when AuthentificationClearTextPassword
          raise ArgumentError, "no password specified" if @password.nil?
          send_data PasswordMessage.new(@password).dump

        when AuthentificationCryptPassword
          raise ArgumentError, "no password specified" if @password.nil?
          send_data PasswordMessage.new(@password.crypt(msg.salt)).dump

        when AuthentificationMD5Password
          raise ArgumentError, "no password specified" if @password.nil?
          require 'digest/md5'

          m = Digest::MD5.hexdigest(@password + @user)
          m = Digest::MD5.hexdigest(m + msg.salt)
          m = 'md5' + m
          send_data PasswordMessage.new(m).dump

        when AuthentificationKerberosV4, AuthentificationKerberosV5, AuthentificationSCMCredential
          raise "unsupported authentification"

        when AuthentificationOk
        when ErrorResponse
          raise msg.field_values.join("\t")
        when NoticeResponse
          @notice_processor.call(msg) if @notice_processor
        when ParameterStatus
          @params[msg.key] = msg.value
        when BackendKeyData
          # TODO
          #p msg
        when ReadyForQuery
          # TODO: use transaction status
          pc,@pending_conn = @pending_conn,nil
          pc.succeed true
        else
          raise "unhandled message type"
        end
      end

      # Cloned and modified from the postgres-pr.
      def dispatch_query_message msg
        case msg
        when DataRow
          @r.rows << msg.columns
        when CommandComplete
          @r.cmd_tag = msg.cmd_tag
        when ReadyForQuery
          pq,@pending_query = @pending_query,nil
          pq.succeed true, @r, @e
        when RowDescription
          @r.fields = msg.fields
        when CopyInResponse
        when CopyOutResponse
        when EmptyQueryResponse
        when ErrorResponse
          # TODO
          @e << msg
        when NoticeResponse
          @notice_processor.call(msg) if @notice_processor
        else
          # TODO
        end
      end
    end
  end
end