mongodb/mongo-ruby-driver

View on GitHub
lib/mongo/protocol/msg.rb

Summary

Maintainability
B
4 hrs
Test Coverage
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2017-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
  module Protocol

    # MongoDB Wire protocol Msg message (OP_MSG), a bi-directional wire
    # protocol opcode.
    #
    # OP_MSG is only available in MongoDB 3.6 (maxWireVersion >= 6) and later.
    #
    # @api private
    #
    # @since 2.5.0
    class Msg < Message
      include Monitoring::Event::Secure

      # The identifier for the database name to execute the command on.
      #
      # @since 2.5.0
      DATABASE_IDENTIFIER = '$db'.freeze

      # Keys that the driver adds to commands. These are going to be
      # moved to the end of the hash for better logging.
      #
      # @api private
      INTERNAL_KEYS = Set.new(%w($clusterTime $db lsid signature txnNumber)).freeze

      # Creates a new OP_MSG protocol message
      #
      # @example Create a OP_MSG wire protocol message
      #   Msg.new([:more_to_come], {}, { hello: 1 },
      #           { type: 1, payload: { identifier: 'documents', sequence: [..] } })
      #
      # @param [ Array<Symbol> ] flags The flag bits. Currently supported
      #   values are :more_to_come and :checksum_present.
      # @param [ Hash ] options The options.
      # @param [ BSON::Document, Hash ] main_document The document that will
      #   become the payload type 0 section. Can contain global args as they
      #   are defined in the OP_MSG specification.
      # @param [ Protocol::Msg::Section1 ] sequences Zero or more payload type 1
      #   sections.
      #
      # @option options [ true, false ] validating_keys Whether keys should be
      #   validated for being valid document keys (i.e. not begin with $ and
      #   not contain dots).
      #   This option is deprecated and will not be used. It will removed in version 3.0.
      #
      # @api private
      #
      # @since 2.5.0
      def initialize(flags, options, main_document, *sequences)
        if flags
          flags.each do |flag|
            unless KNOWN_FLAGS.key?(flag)
              raise ArgumentError, "Unknown flag: #{flag.inspect}"
            end
          end
        end
        @flags = flags || []
        @options = options
        unless main_document.is_a?(Hash)
          raise ArgumentError, "Main document must be a Hash, given: #{main_document.class}"
        end
        @main_document = main_document
        sequences.each_with_index do |section, index|
          unless section.is_a?(Section1)
            raise ArgumentError, "All sequences must be Section1 instances, got: #{section} at index #{index}"
          end
        end
        @sequences = sequences
        @sections = [
          {type: 0, payload: @main_document}
        ] + @sequences.map do |section|
          {type: 1, payload: {
            identifier: section.identifier,
            sequence: section.documents.map do |doc|
              CachingHash.new(doc)
            end,
          }}
        end
        @request_id = nil
        super
      end

      # Whether the message expects a reply from the database.
      #
      # @example Does the message require a reply?
      #   message.replyable?
      #
      # @return [ true, false ] If the message expects a reply.
      #
      # @since 2.5.0
      def replyable?
        @replyable ||= !flags.include?(:more_to_come)
      end

      # Return the event payload for monitoring.
      #
      # @example Return the event payload.
      #   message.payload
      #
      # @return [ BSON::Document ] The event payload.
      #
      # @since 2.5.0
      def payload
        # Reorder keys in main_document for better logging - see
        # https://jira.mongodb.org/browse/RUBY-1591.
        # Note that even without the reordering, the payload is not an exact
        # match to what is sent over the wire because the command as used in
        # the published event combines keys from multiple sections of the
        # payload sent over the wire.
        ordered_command = {}
        skipped_command = {}
        command.each do |k, v|
          if INTERNAL_KEYS.member?(k.to_s)
            skipped_command[k] = v
          else
            ordered_command[k] = v
          end
        end
        ordered_command.update(skipped_command)

        BSON::Document.new(
          command_name: ordered_command.keys.first.to_s,
          database_name: @main_document[DATABASE_IDENTIFIER],
          command: ordered_command,
          request_id: request_id,
          reply: @main_document,
        )
      end

      # Serializes message into bytes that can be sent on the wire.
      #
      # @param [ BSON::ByteBuffer ] buffer where the message should be inserted.
      # @param [ Integer ] max_bson_size The maximum bson object size.
      #
      # @return [ BSON::ByteBuffer ] buffer containing the serialized message.
      #
      # @since 2.5.0
      def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil)
        validate_document_size!(max_bson_size)

        super
        add_check_sum(buffer)
        buffer
      end

      # Compress the message, if the command being sent permits compression.
      # Otherwise returns self.
      #
      # @param [ String, Symbol ] compressor The compressor to use.
      # @param [ Integer ] zlib_compression_level The zlib compression level to use.
      #
      # @return [ Message ] A Protocol::Compressed message or self,
      #  depending on whether this message can be compressed.
      #
      # @since 2.5.0
      # @api private
      def maybe_compress(compressor, zlib_compression_level = nil)
        compress_if_possible(command.keys.first, compressor, zlib_compression_level)
      end

      # Reverse-populates the instance variables after deserialization sets
      # the @sections instance variable to the list of documents.
      #
      # TODO fix deserialization so that this method is not needed.
      #
      # @api private
      def fix_after_deserialization
        if @sections.nil?
          raise NotImplementedError, "After deserializations @sections should have been initialized"
        end
        if @sections.length != 1
          raise NotImplementedError, "Deserialization must have produced exactly one section, but it produced #{sections.length} sections"
        end
        @main_document = @sections.first
        @sequences = []
        @sections = [{type: 0, payload: @main_document}]
      end

      def documents
        [@main_document]
      end

      # Possibly encrypt this message with libmongocrypt. Message will only be
      # encrypted if the specified client exists, that client has been given
      # auto-encryption options, the client has not been instructed to bypass
      # auto-encryption, and mongocryptd determines that this message is
      # eligible for encryption. A message is eligible for encryption if it
      # represents one of the command types allow-listed by libmongocrypt and it
      # contains data that is required to be encrypted by a local or remote json schema.
      #
      # @param [ Mongo::Server::Connection ] connection The connection on which
      #   the operation is performed.
      # @param [ Mongo::Operation::Context ] context The operation context.
      #
      # @return [ Mongo::Protocol::Msg ] The encrypted message, or the original
      #   message if encryption was not possible or necessary.
      def maybe_encrypt(connection, context)
        # TODO verify compression happens later, i.e. when this method runs
        # the message is not compressed.
        if context.encrypt?
          if connection.description.max_wire_version < 8
            raise Error::CryptError.new(
              "Cannot perform encryption against a MongoDB server older than " +
              "4.2 (wire version less than 8). Currently connected to server " +
              "with max wire version #{connection.description.max_wire_version}} " +
              "(Auto-encryption requires a minimum MongoDB version of 4.2)"
            )
          end

          db_name = @main_document[DATABASE_IDENTIFIER]
          cmd = merge_sections
          enc_cmd = context.encrypt(db_name, cmd)
          if cmd.key?('$db') && !enc_cmd.key?('$db')
            enc_cmd['$db'] = cmd['$db']
          end

          Msg.new(@flags, @options, enc_cmd)
        else
          self
        end
      end

      # Possibly decrypt this message with libmongocrypt. Message will only be
      # decrypted if the specified client exists, that client has been given
      # auto-encryption options, and this message is eligible for decryption.
      # A message is eligible for decryption if it represents one of the command
      # types allow-listed by libmongocrypt and it contains data that is required
      # to be encrypted by a local or remote json schema.
      #
      # @param [ Mongo::Operation::Context ] context The operation context.
      #
      # @return [ Mongo::Protocol::Msg ] The decrypted message, or the original
      #   message if decryption was not possible or necessary.
      def maybe_decrypt(context)
        if context.decrypt?
          cmd = merge_sections
          enc_cmd = context.decrypt(cmd)
          Msg.new(@flags, @options, enc_cmd)
        else
          self
        end
      end

      # Whether this message represents a bulk write. A bulk write is an insert,
      # update, or delete operation that encompasses multiple operations of
      # the same type.
      #
      # @return [ Boolean ] Whether this message represents a bulk write.
      #
      # @note This method was written to support client-side encryption
      #   functionality. It is not recommended that this method be used in
      #   service of any other feature or behavior.
      #
      # @api private
      def bulk_write?
        inserts = @main_document['documents']
        updates = @main_document['updates']
        deletes = @main_document['deletes']

        num_inserts = inserts && inserts.length || 0
        num_updates = updates && updates.length || 0
        num_deletes = deletes && deletes.length || 0

        num_inserts > 1  || num_updates > 1 || num_deletes > 1
      end

      def maybe_add_server_api(server_api)
        conflicts = {}
        %i(apiVersion apiStrict apiDeprecationErrors).each do |key|
          if @main_document.key?(key)
            conflicts[key] = @main_document[key]
          end
          if @main_document.key?(key.to_s)
            conflicts[key] = @main_document[key.to_s]
          end
        end
        unless conflicts.empty?
          raise Error::ServerApiConflict, "The Client is configured with :server_api option but the operation provided the following conflicting parameters: #{conflicts.inspect}"
        end

        main_document = @main_document.merge(
          Utils.transform_server_api(server_api)
        )
        Msg.new(@flags, @options, main_document, *@sequences)
      end

      # Returns the number of documents returned from the server.
      #
      # The Msg instance must be for a server reply and the reply must return
      # an active cursor (either a newly created one or one whose iteration is
      # continuing via getMore).
      #
      # @return [ Integer ] Number of returned documents.
      def number_returned
        if doc = documents.first
          if cursor = doc['cursor']
            if batch = cursor['firstBatch'] || cursor['nextBatch']
              return batch.length
            end
          end
        end
        raise NotImplementedError, "number_returned is only defined for cursor replies"
      end

      private

      # Validate that the documents in this message are all smaller than the
      # maxBsonObjectSize. If not, raise an exception.
      def validate_document_size!(max_bson_size)
        max_bson_size ||= Mongo::Server::ConnectionBase::DEFAULT_MAX_BSON_OBJECT_SIZE

        contains_too_large_document = @sections.any? do |section|
          section[:type] == 1 &&
            section[:payload][:sequence].any? do |document|
              document.to_bson.length > max_bson_size
            end
        end

        if contains_too_large_document
          raise Error::MaxBSONSize.new('The document exceeds maximum allowed BSON object size after serialization')
        end
      end

      def command
        @command ||= if @main_document
          @main_document.dup.tap do |cmd|
            @sequences.each do |section|
              cmd[section.identifier] ||= []
              cmd[section.identifier] += section.documents
            end
          end
        else
          documents.first
        end
      end

      def add_check_sum(buffer)
        if flags.include?(:checksum_present)
          #buffer.put_int32(checksum)
        end
      end

      # Encapsulates a type 1 OP_MSG section.
      #
      # @see https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.md#sections
      #
      # @api private
      class Section1
        def initialize(identifier, documents)
          @identifier, @documents = identifier, documents
        end

        attr_reader :identifier, :documents

        def ==(other)
          other.is_a?(Section1) &&
            identifier == other.identifier && documents == other.documents
        end

        alias :eql? :==
      end

      # The operation code required to specify a OP_MSG message.
      # @return [ Fixnum ] the operation code.
      #
      # @since 2.5.0
      OP_CODE = 2013

      KNOWN_FLAGS = {
        checksum_present: true,
        more_to_come: true,
        exhaust_allowed: true,
      }

      # Available flags for a OP_MSG message.
      FLAGS = Array.new(16).tap do |arr|
        arr[0] = :checksum_present
        arr[1] = :more_to_come
        arr[16] = :exhaust_allowed
      end.freeze

      # @!attribute
      # @return [Array<Symbol>] The flags for this message.
      field :flags, BitVector.new(FLAGS)

      # The sections that will be serialized, or the documents have been
      # deserialized.
      #
      # Usually the sections contain OP_MSG-compliant sections derived
      # from @main_document and @sequences. The information in @main_document
      # and @sequences is duplicated in the sections.
      #
      # When deserializing Msg instances, sections temporarily is an array
      # of documents returned in the type 0 section of the OP_MSG wire
      # protocol message. #fix_after_deserialization method mutates this
      # object to have sections, @main_document and @sequences be what
      # they would have been had the Msg instance been constructed using
      # the constructor (rather than having been deserialized).
      #
      # @return [ Array<Hash> | Array<BSON::Document> ] The sections of
      #   payload type 1 or 0.
      # @api private
      field :sections, Sections

      Registry.register(OP_CODE, self)
    end
  end
end