mongodb/mongo-ruby-driver

View on GitHub
lib/mongo/server/connection_base.rb

Summary

Maintainability
A
3 hrs
Test Coverage
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2019-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
  class Server

    # This class encapsulates common connection functionality.
    #
    # @note Although methods of this module are part of the public API,
    #   the fact that these methods are defined on this module and not on
    #   the classes which include this module is not part of the public API.
    #
    # @api semipublic
    class ConnectionBase < ConnectionCommon
      extend Forwardable
      include Monitoring::Publishable

      # The maximum allowed size in bytes that a user-supplied document may
      # take up when serialized, if the server's hello response does not
      # include maxBsonObjectSize field.
      #
      # The commands that are sent to the server may exceed this size by
      # MAX_BSON_COMMAND_OVERHEAD.
      #
      # @api private
      DEFAULT_MAX_BSON_OBJECT_SIZE = 16777216

      # The additional overhead allowed for command data (i.e. fields added
      # to the command document by the driver, as opposed to documents
      # provided by the user) when serializing a complete command to BSON.
      #
      # @api private
      MAX_BSON_COMMAND_OVERHEAD = 16384

      # @api private
      REDUCED_MAX_BSON_SIZE = 2097152

      # @return [ Hash ] options The passed in options.
      attr_reader :options

      # @return [ Server ] The server that this connection is for.
      #
      # @api private
      attr_reader :server

      # @return [ Mongo::Address ] address The address to connect to.
      def_delegators :server, :address

      # @deprecated
      def_delegators :server,
                     :cluster_time,
                     :update_cluster_time

      # Returns the server description for this connection, derived from
      # the hello response for the handshake performed on this connection.
      #
      # @note A connection object that hasn't yet connected (handshaken and
      #   authenticated, if authentication is required) does not have a
      #   description. While handshaking and authenticating the driver must
      #   be using global defaults, in particular not assuming that the
      #   properties of a particular connection are the same as properties
      #   of other connections made to the same address (since the server
      #   on the other end could have been shut down and a different server
      #   version could have been launched).
      #
      # @return [ Server::Description ] Server description for this connection.
      # @api private
      attr_reader :description

      # @deprecated
      def_delegators :description,
        :features,
        :max_bson_object_size,
        :max_message_size,
        :mongos?

      # @return [ nil | Object ] The service id, if any.
      def service_id
        description&.service_id
      end

      # Connection pool generation from which this connection was created.
      # May be nil.
      #
      # @return [ Integer | nil ] Connection pool generation.
      def generation
        # If the connection is to a load balancer, @generation is set
        # after handshake completes. If the connection is to another server
        # type, generation is specified during connection creation.
        @generation || options[:generation]
      end

      def app_metadata
        @app_metadata ||= begin
          same = true
          AppMetadata::AUTH_OPTION_KEYS.each do |key|
            if @server.options[key] != options[key]
              same = false
              break
            end
          end
          if same
            @server.app_metadata
          else
            AppMetadata.new(options.merge(purpose: @server.app_metadata.purpose))
          end
        end
      end

      # Dispatch a single message to the connection. If the message
      # requires a response, a reply will be returned.
      #
      # @example Dispatch the message.
      #   connection.dispatch([ insert ])
      #
      # @note This method is named dispatch since 'send' is a core Ruby method on
      #   all objects.
      #
      # @note For backwards compatibility, this method accepts the messages
      #   as an array. However, exactly one message must be given per invocation.
      #
      # @param [ Array<Message> ] messages A one-element array containing
      #   the message to dispatch.
      # @param [ Operation::Context ] context The operation context.
      # @param [ Hash ] options
      #
      # @option options [ Boolean ] :deserialize_as_bson Whether to deserialize
      #   the response to this message using BSON objects in place of native
      #   Ruby types wherever possible.
      #
      # @return [ Protocol::Message | nil ] The reply if needed.
      #
      # @raise [ Error::SocketError | Error::SocketTimeoutError ] When there is a network error.
      #
      # @since 2.0.0
      def dispatch(messages, context, options = {})
        # The monitoring code does not correctly handle multiple messages,
        # and the driver internally does not send more than one message at
        # a time ever. Thus prohibit multiple message use for now.
        if messages.length != 1
          raise ArgumentError, 'Can only dispatch one message at a time'
        end
        if description.unknown?
          raise Error::InternalDriverError, "Cannot dispatch a message on a connection with unknown description: #{description.inspect}"
        end
        message = messages.first
        deliver(message, context, options)
      end

      private

      # @raise [ Error::SocketError | Error::SocketTimeoutError ] When there is a network error.
      def deliver(message, context, options = {})
        if Lint.enabled? && !@socket
          raise Error::LintError, "Trying to deliver a message over a disconnected connection (to #{address})"
        end
        buffer = serialize(message, context)
        ensure_connected do |socket|
          operation_id = Monitoring.next_operation_id
          started_event = command_started(address, operation_id, message.payload,
            socket_object_id: socket.object_id, connection_id: id,
            connection_generation: generation,
            server_connection_id: description.server_connection_id,
            service_id: description.service_id,
          )
          start = Utils.monotonic_time
          result = nil
          begin
            result = add_server_diagnostics do
              socket.write(buffer.to_s)
              if message.replyable?
                Protocol::Message.deserialize(socket, max_message_size, message.request_id, options)
              else
                nil
              end
            end
          rescue Exception => e
            total_duration = Utils.monotonic_time - start
            command_failed(nil, address, operation_id, message.payload,
              e.message, total_duration,
              started_event: started_event,
              server_connection_id: description.server_connection_id,
              service_id: description.service_id,
            )
            raise
          else
            total_duration = Utils.monotonic_time - start
            command_completed(result, address, operation_id, message.payload,
              total_duration,
              started_event: started_event,
              server_connection_id: description.server_connection_id,
              service_id: description.service_id,
            )
          end
          if result && context.decrypt?
            result = result.maybe_decrypt(context)
          end
          result
        end
      end

      def serialize(message, context, buffer = BSON::ByteBuffer.new)
        # Driver specifications only mandate the fixed 16MiB limit for
        # serialized BSON documents. However, the server returns its
        # active serialized BSON document size limit in the hello response,
        # which is +max_bson_object_size+ below. The +DEFAULT_MAX_BSON_OBJECT_SIZE+
        # is the 16MiB value mandated by the specifications which we use
        # only as the default if the server's hello did not contain
        # maxBsonObjectSize.
        max_bson_size = max_bson_object_size || DEFAULT_MAX_BSON_OBJECT_SIZE
        if context.encrypt?
          # The client-side encryption specification requires bulk writes to
          # be split at a reduced maxBsonObjectSize. If this message is a bulk
          # write and its size exceeds the reduced size limit, the serializer
          # will raise an exception, which is caught by BulkWrite. BulkWrite
          # will split the operation into individual writes, which will
          # not be subject to the reduced maxBsonObjectSize.
          if message.bulk_write?
            # Make the new maximum size equal to the specified reduced size
            # limit plus the 16KiB overhead allowance.
            max_bson_size = REDUCED_MAX_BSON_SIZE
          end
        end

        # RUBY-2234: It is necessary to check that the message size does not
        # exceed the maximum bson object size before compressing and serializing
        # the final message.
        #
        # This is to avoid the case where the user performs a bulk write
        # larger than 16MiB which, when compressed, becomes smaller than 16MiB.
        # If the driver does not split the bulk writes prior to compression,
        # the entire operation will be sent to the server, which will raise an
        # error because the uncompressed operation exceeds the maximum bson size.
        #
        # To address this problem, we serialize the message prior to compression
        # and raise an exception if the serialized message exceeds the maximum
        # bson size.
        if max_message_size
          # Create a separate buffer that contains the un-compressed message
          # for the purpose of checking its size. Write any pre-existing contents
          # from the original buffer into the temporary one.
          temp_buffer = BSON::ByteBuffer.new

          # TODO: address the fact that this line mutates the buffer.
          temp_buffer.put_bytes(buffer.get_bytes(buffer.length))

          message.serialize(temp_buffer, max_bson_size, MAX_BSON_COMMAND_OVERHEAD)
          if temp_buffer.length > max_message_size
            raise Error::MaxMessageSize.new(max_message_size)
          end
        end

        # RUBY-2335: When the un-compressed message is smaller than the maximum
        # bson size limit, the message will be serialized twice. The operations
        # layer should be refactored to allow compression on an already-
        # serialized message.
        final_message = message.maybe_compress(compressor, options[:zlib_compression_level])
        final_message.serialize(buffer, max_bson_size, MAX_BSON_COMMAND_OVERHEAD)

        buffer
      end
    end
  end
end