mongodb/mongo-ruby-driver

View on GitHub
lib/mongo/collection/view/iterable.rb

Summary

Maintainability
B
5 hrs
Test Coverage
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
  class Collection
    class View

      # Defines iteration related behavior for collection views, including
      # cursor instantiation.
      #
      # @since 2.0.0
      module Iterable

        # Returns the cursor associated with this view, if any.
        #
        # @return [ nil | Cursor ] The cursor, if any.
        #
        # @api private
        attr_reader :cursor

        # Iterate through documents returned by a query with this +View+.
        #
        # @example Iterate through the result of the view.
        #   view.each do |document|
        #     p document
        #   end
        #
        # @return [ Enumerator ] The enumerator.
        #
        # @since 2.0.0
        #
        # @yieldparam [ Hash ] Each matching document.
        def each
          # If the caching cursor is closed and was not fully iterated,
          # the documents we have in it are not the complete result set and
          # we have no way of completing that iteration.
          # Therefore, discard that cursor and start iteration again.
          # The case of the caching cursor not being closed and not having
          # been fully iterated isn't tested - see RUBY-2773.
          @cursor = if use_query_cache? && cached_cursor && (
            cached_cursor.fully_iterated? || !cached_cursor.closed?
          )
            cached_cursor
          else
            session = client.send(:get_session, @options)
            select_cursor(session).tap do |cursor|
              if use_query_cache?
                # No need to store the cursor in the query cache if there is
                # already a cached cursor stored at this key.
                QueryCache.set(cursor, **cache_options)
              end
            end
          end

          if use_query_cache?
            # If a query with a limit is performed, the query cache will
            # re-use results from an earlier query with the same or larger
            # limit, and then impose the lower limit during iteration.
            limit_for_cached_query = respond_to?(:limit) ? QueryCache.normalized_limit(limit) : nil
          end

          if block_given?
            # Ruby versions 2.5 and older do not support arr[0..nil] syntax, so
            # this must be a separate conditional.
            cursor_to_iterate = if limit_for_cached_query
              @cursor.to_a[0...limit_for_cached_query]
            else
              @cursor
            end

            cursor_to_iterate.each do |doc|
              yield doc
            end
          else
            @cursor.to_enum
          end
        end

        # Cleans up resources associated with this query.
        #
        # If there is a server cursor associated with this query, it is
        # closed by sending a KillCursors command to the server.
        #
        # @note This method propagates any errors that occur when closing the
        #   server-side cursor.
        #
        # @return [ nil ] Always nil.
        #
        # @raise [ Error::OperationFailure ] If the server cursor close fails.
        #
        # @since 2.1.0
        def close_query
          if @cursor
            @cursor.close
          end
        end
        alias :kill_cursors :close_query

        private

        def select_cursor(session)
          if respond_to?(:write?, true) && write?
            server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
            result = send_initial_query(server, session)

            if use_query_cache?
              CachingCursor.new(view, result, server, session: session)
            else
              Cursor.new(view, result, server, session: session)
            end
          else
            read_with_retry_cursor(session, server_selector, view) do |server|
              send_initial_query(server, session)
            end
          end
        end

        def cached_cursor
          QueryCache.get(**cache_options)
        end

        def cache_options
          # NB: this hash is passed as keyword argument and must have symbol
          # keys.
          {
            namespace: collection.namespace,
            selector: selector,
            skip: skip,
            sort: sort,
            limit: limit,
            projection: projection,
            collation: collation,
            read_concern: read_concern,
            read_preference: read_preference,
          }
        end

        def initial_query_op(session)
          spec = {
            coll_name: collection.name,
            filter: filter,
            projection: projection,
            db_name: database.name,
            session: session,
            collation: collation,
            sort: sort,
            skip: skip,
            let: options[:let],
            limit: limit,
            allow_disk_use: options[:allow_disk_use],
            allow_partial_results: options[:allow_partial_results],
            read: read,
            read_concern: options[:read_concern] || read_concern,
            batch_size: batch_size,
            hint: options[:hint],
            max_scan: options[:max_scan],
            max_time_ms: options[:max_time_ms],
            max_value: options[:max_value],
            min_value: options[:min_value],
            no_cursor_timeout: options[:no_cursor_timeout],
            return_key: options[:return_key],
            show_disk_loc: options[:show_disk_loc],
            comment: options[:comment],
            oplog_replay: if (v = options[:oplog_replay]).nil?
              collection.options[:oplog_replay]
            else
              v
            end,
          }

          if spec[:oplog_replay]
            collection.client.log_warn("The :oplog_replay option is deprecated and ignored by MongoDB 4.4 and later")
          end

          maybe_set_tailable_options(spec)

          if explained?
            spec[:explain] = options[:explain]
            Operation::Explain.new(spec)
          else
            Operation::Find.new(spec)
          end
        end

        def send_initial_query(server, session = nil)
          initial_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session))
        end

        def use_query_cache?
          QueryCache.enabled? && !collection.system_collection?
        end

        # Add tailable cusror options to the command specifiction if needed.
        #
        # @param [ Hash ] spec The command specification.
        def maybe_set_tailable_options(spec)
          case cursor_type
          when :tailable
            spec[:tailable] = true
          when :tailable_await
            spec[:tailable] = true
            spec[:await_data] = true
          end
        end
      end
    end
  end
end