lib/moped/cursor.rb
module Moped
# Contains logic for cursor behaviour.
#
# @api private
class Cursor
include Readable
include Enumerable
include Retryable
# @attribute [r] get_more_op The get more message.
# @attribute [r] kill_cursor_op The kill cursor message.
# @attribute [r] query_op The query message.
# @attribute [r] session The session.
attr_reader :get_more_op, :kill_cursor_op, :query_op, :session
# Iterate over the results of the query.
#
# @example Iterate over the results.
# cursor.each do |doc|
# #...
# end
#
# @return [ Enumerator ] The cursor enum.
#
# @since 1.0.0
def each
documents = load_docs
documents.each { |doc| yield doc }
while more?
return kill if limited? && @limit <= 0
documents = get_more
documents.each { |doc| yield doc }
end
end
# Get more documents from the database for the cursor. Executes a get more
# command.
#
# @example Get more docs.
# cursor.get_more
#
# @return [ Array<Hash> ] The next batch of documents.
#
# @since 1.0.0
def get_more
with_retry(session.cluster) do
reply = @node.get_more @database, @collection, @cursor_id, request_limit
@limit -= reply.count if limited?
@cursor_id = reply.cursor_id
reply.documents
end
end
# Determine the request limit for the query
#
# @example What is the cursor request_limit
# cursor.request_limit
#
# @return [ Integer ]
#
# @since 1.0.0
def request_limit
if limited?
@batch_size < @limit ? @batch_size : @limit
else
@batch_size
end
end
# Initialize the new cursor.
#
# @example Create the new cursor.
# Cursor.new(session, message)
#
# @param [ Session ] session The session.
# @param [ Message ] query_operation The query message.
#
# @since 1.0.0
def initialize(session, query_operation)
@session = session
@database = query_operation.database
@collection = query_operation.collection
@selector = query_operation.selector
@cursor_id = 0
@limit = query_operation.limit
@limited = @limit > 0
@batch_size = query_operation.batch_size || @limit
@options = {
request_id: query_operation.request_id,
flags: query_operation.flags,
limit: query_operation.limit,
skip: query_operation.skip,
fields: query_operation.fields,
}
end
# Kill the cursor.
#
# @example Kill the cursor.
# cursor.kill
#
# @return [ Object ] The result of the kill cursors command.
#
# @since 1.0.0
def kill
@node.kill_cursors([ @cursor_id ])
end
# Does the cursor have a limit provided in the query?
#
# @example Is the cursor limited?
# cursor.limited?
#
# @return [ true, false ] If a limit has been provided over zero.
#
# @since 1.0.0
def limited?
@limited
end
# Load the documents from the database.
#
# @example Load the documents.
# cursor.load_docs
#
# @return [ Array<Hash> ] The documents.
#
# @since 1.0.0
def load_docs
@options[:flags] |= [:no_cursor_timeout] if @options[:no_timeout]
options = @options.clone
options[:limit] = request_limit
reply, @node = read_preference.with_node(session.cluster) do |node|
[ node.query(@database, @collection, @selector, query_options(options)), node ]
end
@limit -= reply.count if limited?
@cursor_id = reply.cursor_id
reply.documents
end
# Are there more documents to be returned from the database?
#
# @example Are there more documents?
# cursor.more?
#
# @return [ true, false ] If there are more documents to load.
#
# @since 1.0.0
def more?
@cursor_id != 0
end
end
end