mongodb/mongo-ruby-driver

View on GitHub
lib/mongo/cluster/reapers/cursor_reaper.rb

Summary

Maintainability
B
5 hrs
Test Coverage
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo

  class Cluster

    # A manager that sends kill cursors operations at regular intervals to close
    # cursors that have been garbage collected without being exhausted.
    #
    # @api private
    #
    # @since 2.3.0
    class CursorReaper
      include Retryable

      # The default time interval for the cursor reaper to send pending
      # kill cursors operations.
      #
      # @since 2.3.0
      FREQUENCY = 1.freeze

      # Create a cursor reaper.
      #
      # @param [ Cluster ] cluster The cluster.
      #
      # @api private
      def initialize(cluster)
        @cluster = cluster
        @to_kill = {}
        @active_cursor_ids = Set.new
        @mutex = Mutex.new
        @kill_spec_queue = Queue.new
      end

      attr_reader :cluster

      # Schedule a kill cursors operation to be eventually executed.
      #
      # @param [ Cursor::KillSpec ] kill_spec The kill specification.
      #
      # @api private
      def schedule_kill_cursor(kill_spec)
        @kill_spec_queue << kill_spec
      end

      # Register a cursor id as active.
      #
      # @example Register a cursor as active.
      #   cursor_reaper.register_cursor(id)
      #
      # @param [ Integer ] id The id of the cursor to register as active.
      #
      # @api private
      #
      # @since 2.3.0
      def register_cursor(id)
        if id.nil?
          raise ArgumentError, 'register_cursor called with nil cursor_id'
        end
        if id == 0
          raise ArgumentError, 'register_cursor called with cursor_id=0'
        end

        @mutex.synchronize do
          @active_cursor_ids << id
        end
      end

      # Unregister a cursor id, indicating that it's no longer active.
      #
      # @example Unregister a cursor.
      #   cursor_reaper.unregister_cursor(id)
      #
      # @param [ Integer ] id The id of the cursor to unregister.
      #
      # @api private
      #
      # @since 2.3.0
      def unregister_cursor(id)
        if id.nil?
          raise ArgumentError, 'unregister_cursor called with nil cursor_id'
        end
        if id == 0
          raise ArgumentError, 'unregister_cursor called with cursor_id=0'
        end

        @mutex.synchronize do
          @active_cursor_ids.delete(id)
        end
      end

      # Read and decode scheduled kill cursors operations.
      #
      # This method mutates instance variables without locking, so is is not
      # thread safe. Generally, it should not be called itself, this is a helper
      # for `kill_cursor` method.
      #
      # @api private
      def read_scheduled_kill_specs
        while kill_spec = @kill_spec_queue.pop(true)
          if @active_cursor_ids.include?(kill_spec.cursor_id)
            @to_kill[kill_spec.server_address] ||= Set.new
            @to_kill[kill_spec.server_address] << kill_spec
          end
        end
      rescue ThreadError
        # Empty queue, nothing to do.
      end

      # Execute all pending kill cursors operations.
      #
      # @example Execute pending kill cursors operations.
      #   cursor_reaper.kill_cursors
      #
      # @api private
      #
      # @since 2.3.0
      def kill_cursors
        # TODO optimize this to batch kill cursor operations for the same
        # server/database/collection instead of killing each cursor
        # individually.
        loop do
          server_address = nil

          kill_spec = @mutex.synchronize do
            read_scheduled_kill_specs
            # Find a server that has any cursors scheduled for destruction.
            server_address, specs =
              @to_kill.detect { |_, specs| specs.any? }

            if specs.nil?
              # All servers have empty specs, nothing to do.
              return
            end

            # Note that this mutates the spec in the queue.
            # If the kill cursor operation fails, we don't attempt to
            # kill that cursor again.
            spec = specs.take(1).tap do |arr|
              specs.subtract(arr)
            end.first

            unless @active_cursor_ids.include?(spec.cursor_id)
              # The cursor was already killed, typically because it has
              # been iterated to completion. Remove the kill spec from
              # our records without doing any more work.
              spec = nil
            end

            spec
          end

          # If there was a spec to kill but its cursor was already killed,
          # look for another spec.
          next unless kill_spec

          # We could also pass kill_spec directly into the KillCursors
          # operation, though this would make that operation have a
          # different API from all of the other ones which accept hashes.
          spec = {
            cursor_ids: [kill_spec.cursor_id],
            coll_name: kill_spec.coll_name,
            db_name: kill_spec.db_name,
          }
          op = Operation::KillCursors.new(spec)

          server = cluster.servers.detect do |server|
            server.address == server_address
          end

          unless server
            # TODO We currently don't have a server for the address that the
            # cursor is associated with. We should leave the cursor in the
            # queue to be killed at a later time (when the server comes back).
            next
          end

          options = {
            server_api: server.options[:server_api],
            connection_global_id: kill_spec.connection_global_id,
          }
          op.execute(server, context: Operation::Context.new(options: options))

          if session = kill_spec.session
            if session.implicit?
              session.end_session
            end
          end
        end
      end
      alias :execute :kill_cursors
      alias :flush :kill_cursors
    end
  end
end