mongodb/mongo-ruby-driver

View on GitHub
lib/mongo/bulk_write.rb

Summary

Maintainability
D
2 days
Test Coverage
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/bulk_write/result'
require 'mongo/bulk_write/transformable'
require 'mongo/bulk_write/validatable'
require 'mongo/bulk_write/combineable'
require 'mongo/bulk_write/ordered_combiner'
require 'mongo/bulk_write/unordered_combiner'
require 'mongo/bulk_write/result_combiner'

module Mongo
  class BulkWrite
    extend Forwardable
    include Operation::ResponseHandling

    # @return [ Mongo::Collection ] collection The collection.
    attr_reader :collection

    # @return [ Array<Hash, BSON::Document> ] requests The requests.
    attr_reader :requests

    # @return [ Hash, BSON::Document ] options The options.
    attr_reader :options

    # Delegate various methods to the collection.
    def_delegators :@collection,
                   :database,
                   :cluster,
                   :write_with_retry,
                   :nro_write_with_retry,
                   :next_primary

    def_delegators :database, :client

    # Execute the bulk write operation.
    #
    # @example Execute the bulk write.
    #   bulk_write.execute
    #
    # @return [ Mongo::BulkWrite::Result ] The result.
    #
    # @since 2.1.0
    def execute
      operation_id = Monitoring.next_operation_id
      result_combiner = ResultCombiner.new
      operations = op_combiner.combine
      validate_requests!

      client.send(:with_session, @options) do |session|
        context = Operation::Context.new(client: client, session: session)
        operations.each do |operation|
          if single_statement?(operation)
            write_concern = write_concern(session)
            write_with_retry(write_concern, context: context) do |connection, txn_num, context|
              execute_operation(
                operation.keys.first,
                operation.values.flatten,
                connection,
                context,
                operation_id,
                result_combiner,
                session,
                txn_num)
            end
          else
            nro_write_with_retry(write_concern, context: context) do |connection, txn_num, context|
              execute_operation(
                operation.keys.first,
                operation.values.flatten,
                connection,
                context,
                operation_id,
                result_combiner,
                session)
            end
          end
        end
      end
      result_combiner.result
    end

    # Create the new bulk write operation.
    #
    # @api private
    #
    # @example Create an ordered bulk write.
    #   Mongo::BulkWrite.new(collection, [{ insert_one: { _id: 1 }}])
    #
    # @example Create an unordered bulk write.
    #   Mongo::BulkWrite.new(collection, [{ insert_one: { _id: 1 }}], ordered: false)
    #
    # @example Create an ordered mixed bulk write.
    #   Mongo::BulkWrite.new(
    #     collection,
    #     [
    #       { insert_one: { _id: 1 }},
    #       { update_one: { filter: { _id: 0 }, update: { '$set' => { name: 'test' }}}},
    #       { delete_one: { filter: { _id: 2 }}}
    #     ]
    #   )
    #
    # @param [ Mongo::Collection ] collection The collection.
    # @param [ Enumerable<Hash, BSON::Document> ] requests The requests,
    #   cannot be empty.
    # @param [ Hash, BSON::Document ] options The options.
    #
    # @since 2.1.0
    def initialize(collection, requests, options = {})
      @collection = collection
      @requests = requests
      @options = options || {}
    end

    # Is the bulk write ordered?
    #
    # @api private
    #
    # @example Is the bulk write ordered?
    #   bulk_write.ordered?
    #
    # @return [ true, false ] If the bulk write is ordered.
    #
    # @since 2.1.0
    def ordered?
      @ordered ||= options.fetch(:ordered, true)
    end

    # Get the write concern for the bulk write.
    #
    # @api private
    #
    # @example Get the write concern.
    #   bulk_write.write_concern
    #
    # @return [ WriteConcern ] The write concern.
    #
    # @since 2.1.0
    def write_concern(session = nil)
      @write_concern ||= options[:write_concern] ?
        WriteConcern.get(options[:write_concern]) :
        collection.write_concern_with_session(session)
    end

    private

    SINGLE_STATEMENT_OPS = [ :delete_one,
                             :update_one,
                             :insert_one ].freeze

    def single_statement?(operation)
      SINGLE_STATEMENT_OPS.include?(operation.keys.first)
    end

    def base_spec(operation_id, session)
      {
        :db_name => database.name,
        :coll_name => collection.name,
        :write_concern => write_concern(session),
        :ordered => ordered?,
        :operation_id => operation_id,
        :bypass_document_validation => !!options[:bypass_document_validation],
        :max_time_ms => options[:max_time_ms],
        :options => options,
        :id_generator => client.options[:id_generator],
        :session => session,
        :comment => options[:comment],
        :let => options[:let],
      }
    end

    def execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil)
      validate_collation!(connection)
      validate_array_filters!(connection)
      validate_hint!(connection)

      unpin_maybe(session, connection) do
        if values.size > connection.description.max_write_batch_size
          split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
        else
          result = send(name, values, connection, context, operation_id, session, txn_num)

          add_server_diagnostics(connection) do
            add_error_labels(connection, context) do
              result_combiner.combine!(result, values.size)
            end
          end
        end
      end
    # With OP_MSG (3.6+ servers), the size of each section in the message
    # is independently capped at 16m and each bulk operation becomes
    # its own section. The size of the entire bulk write is limited to 48m.
    # With OP_QUERY (pre-3.6 servers), the entire bulk write is sent as a
    # single document and is thus subject to the 16m document size limit.
    # This means the splits differ between pre-3.6 and 3.6+ servers, with
    # 3.6+ servers being able to split less.
    rescue Error::MaxBSONSize, Error::MaxMessageSize => e
      raise e if values.size <= 1
      unpin_maybe(session, connection) do
        split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
      end
    end

    def op_combiner
      @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests)
    end

    def split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num)
      execute_operation(name, values.shift(values.size / 2), connection, context, operation_id, result_combiner, session, txn_num)

      txn_num = session.next_txn_num if txn_num && !session.in_transaction?
      execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num)
    end

    def delete_one(documents, connection, context, operation_id, session, txn_num)
      QueryCache.clear_namespace(collection.namespace)

      spec = base_spec(operation_id, session).merge(:deletes => documents, :txn_num => txn_num)
      Operation::Delete.new(spec).bulk_execute(connection, context: context)
    end

    def delete_many(documents, connection, context, operation_id, session, txn_num)
      QueryCache.clear_namespace(collection.namespace)

      spec = base_spec(operation_id, session).merge(:deletes => documents)
      Operation::Delete.new(spec).bulk_execute(connection, context: context)
    end

    def insert_one(documents, connection, context, operation_id, session, txn_num)
      QueryCache.clear_namespace(collection.namespace)

      spec = base_spec(operation_id, session).merge(:documents => documents, :txn_num => txn_num)
      Operation::Insert.new(spec).bulk_execute(connection, context: context)
    end

    def update_one(documents, connection, context, operation_id, session, txn_num)
      QueryCache.clear_namespace(collection.namespace)

      spec = base_spec(operation_id, session).merge(:updates => documents, :txn_num => txn_num)
      Operation::Update.new(spec).bulk_execute(connection, context: context)
    end
    alias :replace_one :update_one

    def update_many(documents, connection, context, operation_id, session, txn_num)
      QueryCache.clear_namespace(collection.namespace)

      spec = base_spec(operation_id, session).merge(:updates => documents)
      Operation::Update.new(spec).bulk_execute(connection, context: context)
    end

    private

    def validate_collation!(connection)
      if op_combiner.has_collation? && !connection.features.collation_enabled?
        raise Error::UnsupportedCollation.new
      end
    end

    def validate_array_filters!(connection)
      if op_combiner.has_array_filters? && !connection.features.array_filters_enabled?
        raise Error::UnsupportedArrayFilters.new
      end
    end

    def validate_hint!(connection)
      if op_combiner.has_hint?
        if !can_hint?(connection) && write_concern && !write_concern.acknowledged?
          raise Error::UnsupportedOption.hint_error(unacknowledged_write: true)
        elsif !connection.features.update_delete_option_validation_enabled?
          raise Error::UnsupportedOption.hint_error
        end
      end
    end

    # Loop through the requests and check if each operation is allowed to send
    # a hint for each operation on the given server version.
    #
    # For the following operations, the client can send a hint for servers >= 4.2
    # and for the rest, the client can only send it for 4.4+:
    #   - updateOne
    #   - updateMany
    #   - replaceOne
    #
    # @param [ Connection ] connection The connection object.
    #
    # @return [ true | false ] Whether the request is able to send hints for
    #   the current server version.
    def can_hint?(connection)
      gte_4_2 = connection.server.description.server_version_gte?('4.2')
      gte_4_4 = connection.server.description.server_version_gte?('4.4')
      op_combiner.requests.all? do |req|
        op = req.keys.first
        if req[op].keys.include?(:hint)
          if [:update_one, :update_many, :replace_one].include?(op)
            gte_4_2
          else
            gte_4_4
          end
        else
          true
        end
      end
    end

    # Perform the request document validation required by driver specifications.
    # This method validates the first key of each update request document to be
    # an operator (i.e. start with $) and the first key of each replacement
    # document to not be an operator (i.e. not start with $). The request document
    # may be invalid without this method flagging it as such (for example an
    # update or replacement document containing some keys which are operators
    # and some which are not), in which case the driver expects the server to
    # fail the operation with an error.
    #
    # Raise an ArgumentError if requests is empty.
    #
    # @raise [ Error::InvalidUpdateDocument, Error::InvalidReplacementDocument,
    #   ArgumentError ]
    #   if the document is invalid.
    def validate_requests!
      requests_empty = true
      @requests.each do |req|
        requests_empty = false
        if op = req.keys.first
          if [:update_one, :update_many].include?(op)
            if doc = maybe_first(req.dig(op, :update))
              if key = doc.keys&.first
                unless key.to_s.start_with?("$")
                  if Mongo.validate_update_replace
                    raise Error::InvalidUpdateDocument.new(key: key)
                  else
                    Error::InvalidUpdateDocument.warn(Logger.logger, key)
                  end
                end
              end
            end
          elsif op == :replace_one
            if key = req.dig(op, :replacement)&.keys&.first
              if key.to_s.start_with?("$")
                if Mongo.validate_update_replace
                  raise Error::InvalidReplacementDocument.new(key: key)
                else
                  Error::InvalidReplacementDocument.warn(Logger.logger, key)
                end
              end
            end
          end
        end
      end.tap do
        raise ArgumentError, "Bulk write requests cannot be empty" if requests_empty
      end
    end

    # If the given object is an array return the first element, otherwise
    # return the given object.
    #
    # @param [ Object ] obj The given object.
    #
    # @return [ Object ] The first element of the array or the given object.
    def maybe_first(obj)
      obj.is_a?(Array) ? obj.first : obj
    end
  end
end