mongoid/moped

View on GitHub
lib/moped/protocol/message.rb

Summary

Maintainability
A
2 hrs
Test Coverage
module Moped
  module Protocol
    # The base class for building all messages needed to implement the Mongo
    # Wire Protocol. It provides a minimal DSL for defining typed fields for
    # serialization and deserialization over the wire.
    #
    # @example
    #
    #   class KillCursors < Moped::Protocol::Message
    #     # header fields
    #     int32 :length
    #     int32 :request_id
    #     int32 :response_to
    #     int32 :op_code
    #
    #     # message fields
    #     int32 :reserved
    #     int32 :number_of_cursors
    #     int64 :cursor_ids, type: :array
    #
    #     # Customize field reader
    #     def number_of_cursors
    #       cursor_ids.length
    #     end
    #   end
    #
    # Note that all messages *must* implement the header fields required by the
    # Mongo Wire Protocol, namely:
    #
    #   int32 :length
    #   int32 :request_id
    #   int32 :response_to
    #   int32 :op_code
    #
    module Message
      INT32_DECODE_STR       = 'l<'
      INT64_DECODE_ARRAY_STR = 'q<*'
      INT64_DECODE_STR       = 'q<'

      # Default implementation for a message is to do nothing when receiving
      # replies.
      #
      # @example Receive replies.
      #   message.receive_replies(connection)
      #
      # @param [ Connection ] connection The connection.
      #
      # @return [ nil ] nil.
      #
      # @since 1.0.0
      def receive_replies(connection); end

      # Serializes the message and all of its fields to a new buffer or to the
      # provided buffer.
      #
      # @example Serliaze the message.
      #   message.serialize
      #
      # @param [ String ] buffer A buffer to serialize to.
      #
      # @return [ String ] The result of serliazing this message
      #
      # @since 1.0.0
      def serialize(buffer = "")
        raise NotImplementedError, "This method is generated after calling #finalize on a message class"
      end
      alias :to_s :serialize

      # @return [String] the nicely formatted version of the message
      def inspect
        fields = self.class.fields.map do |field|
          "@#{field}=" + __send__(field).inspect
        end
        "#<#{self.class.name}\n" <<
        "  #{fields * "\n  "}>"
      end
      class << self

        # Extends the including class with +ClassMethods+.
        #
        # @param [Class] subclass the inheriting class
        def included(base)
          super
          base.extend(ClassMethods)
        end
        private :included
      end

      # Provides a DSL for defining struct-like fields for building messages
      # for the Mongo Wire.
      #
      # @example
      #   class Command
      #     extend Message::ClassMethods
      #
      #     int32 :length
      #   end
      #
      #   Command.fields # => [:length]
      #   command = Command.new
      #   command.length = 12
      #   command.serialize_length("") # => "\f\x00\x00\x00"
      module ClassMethods

        # @return [Array] the fields defined for this message
        def fields
          @fields ||= []
        end

        # Declare a null terminated string field.
        #
        # @example
        #   class Query < Message
        #     cstring :collection
        #   end
        #
        # @param [String] name the name of this field
        def cstring(name)
          attr_accessor name

          class_eval <<-RUBY, __FILE__, __LINE__ + 1
            def serialize_#{name}(buffer)
              buffer << #{name}
              buffer << 0
            end
          RUBY

          fields << name
        end

        # Declare a BSON Document field.
        #
        # @example
        #   class Update < Message
        #     document :selector
        #   end
        #
        # @example optional document field
        #   class Query < Message
        #     document :selector
        #     document :fields, optional: true
        #   end
        #
        # @example array of documents
        #   class Reply < Message
        #     document :documents, type: :array
        #   end
        #
        # @param [String] name the name of this field
        # @param [Hash] options the options for this field
        # @option options [:array] :type specify an array of documents
        # @option options [Boolean] :optional specify this field as optional
        def document(name, options = {})
          attr_accessor name

          if options[:optional]
            class_eval <<-RUBY, __FILE__, __LINE__ + 1
              def serialize_#{name}(buffer)
                buffer << #{name}.to_bson if #{name}
              end
            RUBY
          elsif options[:type] == :array
            class_eval <<-RUBY, __FILE__, __LINE__ + 1
              def serialize_#{name}(buffer)
                #{name}.each do |document|
                  buffer << document.to_bson
                end
              end
            RUBY
          else
            class_eval <<-RUBY, __FILE__, __LINE__ + 1
              def serialize_#{name}(buffer)
                buffer << #{name}.to_bson
              end
            RUBY
          end

          fields << name
        end

        # Declare a flag field (32 bit signed integer)
        #
        # @example
        #   class Update < Message
        #     flags :flags, upsert: 2 ** 0,
        #                   multi:  2 ** 1
        #   end
        #
        # @param [String] name the name of this field
        # @param [Hash{Symbol => Number}] flags the flags for this flag field
        def flags(name, flag_map = {})
          class_eval <<-RUBY, __FILE__, __LINE__ + 1
            def #{name}
              @#{name} ||= []
            end

            def #{name}=(flags)
              if flags.is_a? Numeric
                @#{name} = #{name}_from_int(flags)
              else
                @#{name} = flags
              end
            end

            def #{name}_as_int
              bits = 0
              flags = self.#{name}
              #{flag_map.map { |flag, value| "bits |= #{value} if flags.include? #{flag.inspect}" }.join "\n"}
              bits
            end

            def #{name}_from_int(bits)
              flags = []
              #{flag_map.map { |flag, value| "flags << #{flag.inspect} if #{value} & bits == #{value}" }.join "\n"}
              flags
            end

            def serialize_#{name}(buffer)
              buffer << [#{name}_as_int].pack(INT32_DECODE_STR)
            end

            def deserialize_#{name}(buffer)
              bits, = buffer.read(4).unpack(INT32_DECODE_STR)

              self.#{name} = bits
            end
          RUBY

          fields << name
        end

        # Declare a 32 bit signed integer field.
        #
        # @example
        #   class Query < Message
        #     int32 :length
        #   end
        #
        # @param [String] name the name of this field
        def int32(name)
          attr_writer name

          class_eval <<-RUBY, __FILE__, __LINE__ + 1
            def #{name}
              @#{name} ||= 0
            end

            def serialize_#{name}(buffer)
              buffer << [#{name}].pack(INT32_DECODE_STR)
            end

            def deserialize_#{name}(buffer)
              self.#{name}, = buffer.read(4).unpack(INT32_DECODE_STR)
            end
          RUBY

          fields << name
        end

        # Declare a 64 bit signed integer field.
        #
        # @example
        #   class Query < Message
        #     int64 :cursor_id
        #   end
        #
        # @example with array type
        #   class KillCursors < Message
        #     int64 :cursor_ids, type: :array
        #   end
        #
        # @param [String] name the name of this field
        # @param [Hash] options the options for this field
        # @option options [:array] :type specify an array of 64 bit ints
        def int64(name, options = {})
          attr_writer name

          if options[:type] == :array
            class_eval <<-RUBY, __FILE__, __LINE__ + 1
              def #{name}
                @#{name} ||= []
              end

              def serialize_#{name}(buffer)
                buffer << #{name}.pack(INT64_DECODE_ARRAY_STR)
              end

              def deserialize_#{name}(buffer)
                raise NotImplementedError
              end
            RUBY
          else
            class_eval <<-RUBY, __FILE__, __LINE__ + 1
              def #{name}
                @#{name} ||= 0
              end

              def serialize_#{name}(buffer)
                buffer << [#{name}].pack(INT64_DECODE_STR)
              end

              def deserialize_#{name}(buffer)
                self.#{name}, = buffer.read(8).unpack(INT64_DECODE_STR)
              end
            RUBY
          end

          fields << name
        end

        # Declares the message class as complete, and defines its serialization
        # method from the declared fields.
        def finalize
          class_eval <<-EOS, __FILE__, __LINE__ + 1
            def serialize(buffer = "")
              start = buffer.bytesize
              #{fields.map { |f| "serialize_#{f}(buffer)" }.join("\n")}
              self.length = buffer.bytesize - start
              buffer[start, 4] = serialize_length("")
              buffer
            end
            alias :to_s :serialize
          EOS
        end

        private

        # This ensures that subclasses of the primary wire message classes have
        # identical fields.
        def inherited(subclass)
          super
          subclass.fields.replace(fields)
        end
      end
    end
  end
end