mongodb/mongoid

View on GitHub
lib/mongoid/contextual/mongo/documents_loader.rb

Summary

Maintainability
A
0 mins
Test Coverage
# rubocop:todo all
require "mongoid/association/eager_loadable"

module Mongoid
  module Contextual
    class Mongo
      # Loads documents for the provided criteria.
      #
      # @api private
      class DocumentsLoader
        extend Forwardable
        include Association::EagerLoadable

        def_delegators :@future, :value!, :value, :wait!, :wait

        # Returns synchronous executor to be used when async_query_executor config option
        # is set to :immediate. This executor runs all operations on the current
        # thread, blocking as necessary.
        #
        # @return [ Concurrent::ImmediateExecutor ] The executor
        #   to be used to execute document loading tasks.
        def self.immediate_executor
          @@immediate_executor ||= Concurrent::ImmediateExecutor.new
        end

        # Returns asynchronous executor to be used when async_query_executor config option
        # is set to :global_thread_pool. This executor runs operations on background threads
        # using a thread pool.
        #
        # @return [ Concurrent::ThreadPoolExecutor ] The executor
        #   to be used to execute document loading tasks.
        def self.global_thread_pool_async_query_executor
          create_pool = Proc.new do |concurrency|
            Concurrent::ThreadPoolExecutor.new(
              min_threads: 0,
              max_threads: concurrency,
              max_queue: concurrency * 4,
              fallback_policy: :caller_runs
            )
          end
          concurrency = Mongoid.global_executor_concurrency || 4
          @@global_thread_pool_async_query_executor ||= create_pool.call(concurrency)
          if @@global_thread_pool_async_query_executor.max_length != concurrency
            old_pool = @@global_thread_pool_async_query_executor
            @@global_thread_pool_async_query_executor = create_pool.call(concurrency)
            old_pool.shutdown
          end
          @@global_thread_pool_async_query_executor
        end

        # Returns suitable executor according to Mongoid config options.
        #
        # @param [ String | Symbol] name The query executor name, can be either
        #   :immediate or :global_thread_pool. Defaulted to `async_query_executor`
        #   config option.
        #
        # @return [ Concurrent::ImmediateExecutor | Concurrent::ThreadPoolExecutor ] The executor
        #   to be used to execute document loading tasks.
        #
        # @raise [ Errors::InvalidQueryExecutor ] If an unknown name is provided.
        def self.executor(name = Mongoid.async_query_executor)
          case name.to_sym
          when :immediate
            immediate_executor
          when :global_thread_pool
            global_thread_pool_async_query_executor
          else
            raise Errors::InvalidQueryExecutor.new(name)
          end
        end

        # @return [ Mongoid::Criteria ] Criteria that specifies which documents should
        #   be loaded. Exposed here because `eager_loadable?` method from
        #   `Association::EagerLoadable` expects this to be available.
        attr_accessor :criteria

        # Instantiates the document loader instance and immediately schedules
        # its execution using the provided executor.
        #
        # @param [ Mongo::Collection::View ] view The collection view to get
        #   records from the database.
        # @param [ Class ] klass Mongoid model class to instantiate documents.
        #   All records obtained from the database will be converted to an
        #   instance of this class, if possible.
        # @param [ Mongoid::Criteria ] criteria Criteria that specifies which
        #   documents should be loaded.
        # @param [ Concurrent::AbstractExecutorService ] executor Executor that
        #   is capable of running `Concurrent::Promises::Future` instances.
        def initialize(view, klass, criteria, executor: self.class.executor)
          @view = view
          @klass = klass
          @criteria = criteria
          @mutex = Mutex.new
          @state = :pending
          @future = Concurrent::Promises.future_on(executor) do
            start && execute
          end
        end

        # Returns false or true whether the loader is in pending state.
        #
        # Pending state means that the loader execution has been scheduled,
        # but has not been started yet.
        #
        # @return [ true | false ] true if the loader is in pending state,
        #   otherwise false.
        def pending?
          @mutex.synchronize do
            @state == :pending
          end
        end

        # Returns false or true whether the loader is in started state.
        #
        # Started state means that the loader execution has been started.
        # Note that the loader stays in this state even after the execution
        # completed (successfully or failed).
        #
        # @return [ true | false ] true if the loader is in started state,
        #   otherwise false.
        def started?
          @mutex.synchronize do
            @state == :started
          end
        end

        # Mark the loader as unscheduled.
        #
        # If the loader is marked unscheduled, it will not be executed. The only
        # option to load the documents is to call `execute` method directly.
        #
        # Please note that if execution of a task has been already started,
        # unscheduling does not have any effect.
        def unschedule
          @mutex.synchronize do
            @state = :cancelled unless @state == :started
          end
        end

        # Loads records specified by `@criteria` from the database, and convert
        # them to Mongoid documents of `@klass` type.
        #
        # This method is called by the task (possibly asynchronous) scheduled
        # when creating an instance of the loader. However, this method can be
        # called directly, if it is desired to execute loading on the caller
        # thread immediately.
        #
        # Calling this method does not change the state of the loader.
        #
        # @return [ Array<Mongoid::Document> ] Array of document loaded from
        #   the database.
        def execute
          documents = @view.map do |doc|
            Factory.from_db(@klass, doc, @criteria)
          end
          eager_load(documents) if eager_loadable?
          documents
        end

        private

        # Mark the loader as started if possible.
        #
        # @return [ true | false ] Whether the state was changed to :started.
        def start
          @mutex.synchronize do
            if @state == :pending
              @state = :started
              true
            else
              false
            end
          end
        end
      end
    end
  end
end