lib/mongo/operation/result.rb
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'mongo/operation/shared/result/aggregatable'
require 'mongo/operation/shared/result/use_legacy_error_parser'
module Mongo
module Operation
# Result wrapper for wire protocol replies.
#
# An operation has zero or one replies. The only operations producing zero
# replies are unacknowledged writes; all other operations produce one reply.
# This class provides an object that can be operated on (for example, to
# check whether an operation succeeded) even when the operation did not
# produce a reply (in which case it is assumed to have succeeded).
#
# @since 2.0.0
# @api semiprivate
class Result
extend Forwardable
include Enumerable
# The field name for the cursor document in an aggregation.
#
# @since 2.2.0
# @api private
CURSOR = 'cursor'.freeze
# The cursor id field in the cursor document.
#
# @since 2.2.0
# @api private
CURSOR_ID = 'id'.freeze
# The field name for the first batch of a cursor.
#
# @since 2.2.0
# @api private
FIRST_BATCH = 'firstBatch'.freeze
# The field name for the next batch of a cursor.
#
# @since 2.2.0
# @api private
NEXT_BATCH = 'nextBatch'.freeze
# The namespace field in the cursor document.
#
# @since 2.2.0
# @api private
NAMESPACE = 'ns'.freeze
# The number of documents updated in the write.
#
# @since 2.0.0
# @api private
N = 'n'.freeze
# The ok status field in the result.
#
# @since 2.0.0
# @api private
OK = 'ok'.freeze
# The result field constant.
#
# @since 2.2.0
# @api private
RESULT = 'result'.freeze
# Initialize a new result.
#
# For an unkacknowledged write, pass nil in replies.
#
# For all other operations, replies must be a Protocol::Message instance
# or an array containing a single Protocol::Message instance.
#
# @param [ Protocol::Message | Array<Protocol::Message> | nil ] replies
# The wire protocol replies.
# @param [ Server::Description | nil ] connection_description
# Server description of the server that performed the operation that
# this result is for. This parameter is allowed to be nil for
# compatibility with existing mongo_kerberos library, but should
# always be not nil in the driver proper.
# @param [ Integer ] connection_global_id
# Global id of the connection on which the operation that
# this result is for was performed.
#
# @api private
def initialize(replies, connection_description = nil, connection_global_id = nil)
if replies
if replies.is_a?(Array)
if replies.length != 1
raise ArgumentError, "Only one (or zero) reply is supported, given #{replies.length}"
end
reply = replies.first
else
reply = replies
end
unless reply.is_a?(Protocol::Message)
raise ArgumentError, "Argument must be a Message instance, but is a #{reply.class}: #{reply.inspect}"
end
@replies = [ reply ]
@connection_description = connection_description
@connection_global_id = connection_global_id
end
end
# @return [ Array<Protocol::Message> ] replies The wrapped wire protocol replies.
#
# @api private
attr_reader :replies
# @return [ Server::Description ] Server description of the server that
# the operation was performed on that this result is for.
#
# @api private
attr_reader :connection_description
# @return [ Object ] Global is of the connection that
# the operation was performed on that this result is for.
#
# @api private
attr_reader :connection_global_id
# @api private
def_delegators :parser,
:not_master?, :node_recovering?, :node_shutting_down?
# Is the result acknowledged?
#
# @note On MongoDB 2.6 and higher all writes are acknowledged since the
# driver uses write commands for all write operations. On 2.4 and
# lower, the result is acknowledged if the GLE has been executed after
# the command. If not, no replies will be specified. Reads will always
# return true here since a replies is always provided.
#
# @return [ true, false ] If the result is acknowledged.
#
# @since 2.0.0
# @api public
def acknowledged?
!!@replies
end
# Whether the result contains cursor_id
#
# @return [ true, false ] If the result contains cursor_id.
#
# @api private
def has_cursor_id?
acknowledged? && replies.last.respond_to?(:cursor_id)
end
# Get the cursor id if the response is acknowledged.
#
# @note Cursor ids of 0 indicate there is no cursor on the server.
#
# @example Get the cursor id.
# result.cursor_id
#
# @return [ Integer ] The cursor id.
#
# @since 2.0.0
# @api private
def cursor_id
acknowledged? ? replies.last.cursor_id : 0
end
# Get the namespace of the cursor. The method should be defined in
# result classes where 'ns' is in the server response.
#
# @return [ Nil ]
#
# @since 2.0.0
# @api private
def namespace
nil
end
# Get the documents in the result.
#
# @example Get the documents.
# result.documents
#
# @return [ Array<BSON::Document> ] The documents.
#
# @since 2.0.0
# @api public
def documents
if acknowledged?
replies.flat_map(&:documents)
else
[]
end
end
# Iterate over the documents in the replies.
#
# @example Iterate over the documents.
# result.each do |doc|
# p doc
# end
#
# @return [ Enumerator ] The enumerator.
#
# @yieldparam [ BSON::Document ] Each document in the result.
#
# @since 2.0.0
# @api public
def each(&block)
documents.each(&block)
end
# Get the pretty formatted inspection of the result.
#
# @example Inspect the result.
# result.inspect
#
# @return [ String ] The inspection.
#
# @since 2.0.0
# @api public
def inspect
"#<#{self.class.name}:0x#{object_id} documents=#{documents}>"
end
# Get the reply from the result.
#
# Returns nil if there is no reply (i.e. the operation was an
# unacknowledged write).
#
# @return [ Protocol::Message ] The first reply.
#
# @since 2.0.0
# @api private
def reply
if acknowledged?
replies.first
else
nil
end
end
# Get the number of documents returned by the server in this batch.
#
# @return [ Integer ] The number of documents returned.
#
# @since 2.0.0
# @api public
def returned_count
if acknowledged?
reply.number_returned
else
0
end
end
# If the result was a command then determine if it was considered a
# success.
#
# @note If the write was unacknowledged, then this will always return
# true.
#
# @example Was the command successful?
# result.successful?
#
# @return [ true, false ] If the command was successful.
#
# @since 2.0.0
# @api public
def successful?
return true if !acknowledged?
if first_document.has_key?(OK)
ok? && parser.message.empty?
else
!query_failure? && parser.message.empty?
end
end
# Check the first document's ok field.
#
# @example Check the ok field.
# result.ok?
#
# @return [ true, false ] If the command returned ok.
#
# @since 2.1.0
# @api public
def ok?
# first_document[OK] is a float, and the server can return
# ok as a BSON int32, BSON int64 or a BSON double.
# The number 1 is exactly representable in a float, hence
# 1.0 == 1 is going to perform correctly all of the time
# (until the server returns something other than 1 for success, that is)
first_document[OK] == 1
end
# Validate the result by checking for any errors.
#
# @note This only checks for errors with writes since authentication is
# handled at the connection level and any authentication errors would
# be raised there, before a Result is ever created.
#
# @example Validate the result.
# result.validate!
#
# @raise [ Error::OperationFailure ] If an error is in the result.
#
# @return [ Result ] The result if verification passed.
#
# @since 2.0.0
# @api private
def validate!
!successful? ? raise_operation_failure : self
end
# The exception instance (of the Error::OperationFailure class)
# that would be raised during processing of this result.
#
# This method should only be called when result is not successful.
#
# @return [ Error::OperationFailure ] The exception.
#
# @api private
def error
@error ||= Error::OperationFailure.new(
parser.message,
self,
code: parser.code,
code_name: parser.code_name,
write_concern_error_document: parser.write_concern_error_document,
write_concern_error_code: parser.write_concern_error_code,
write_concern_error_code_name: parser.write_concern_error_code_name,
write_concern_error_labels: parser.write_concern_error_labels,
labels: parser.labels,
wtimeout: parser.wtimeout,
connection_description: connection_description,
document: parser.document,
server_message: parser.server_message,
)
end
# Raises a Mongo::OperationFailure exception corresponding to the
# error information in this result.
#
# @raise Error::OperationFailure
private def raise_operation_failure
raise error
end
# @return [ TopologyVersion | nil ] The topology version.
#
# @api private
def topology_version
unless defined?(@topology_version)
@topology_version = first_document['topologyVersion'] &&
TopologyVersion.new(first_document['topologyVersion'])
end
@topology_version
end
# Get the number of documents written by the server.
#
# @example Get the number of documents written.
# result.written_count
#
# @return [ Integer ] The number of documents written.
#
# @since 2.0.0
# @api public
def written_count
if acknowledged?
first_document[N] || 0
else
0
end
end
# @api public
alias :n :written_count
# Get the operation time reported in the server response.
#
# @example Get the operation time.
# result.operation_time
#
# @return [ Object | nil ] The operation time value.
#
# @since 2.5.0
# @api public
def operation_time
first_document && first_document[OPERATION_TIME]
end
# Get the cluster time reported in the server response.
#
# @example Get the cluster time.
# result.cluster_time
#
# @return [ ClusterTime | nil ] The cluster time document.
#
# Changed in version 2.9.0: This attribute became an instance of
# ClusterTime, which is a subclass of BSON::Document.
# Previously it was an instance of BSON::Document.
#
# @since 2.5.0
# @api public
def cluster_time
first_document && ClusterTime[first_document['$clusterTime']]
end
# Gets the set of error labels associated with the result.
#
# @example Get the labels.
# result.labels
#
# @return [ Array ] labels The set of labels.
#
# @since 2.7.0
# @api private
def labels
@labels ||= parser.labels
end
# Whether the operation failed with a write concern error.
#
# @api private
def write_concern_error?
!!(first_document && first_document['writeConcernError'])
end
def snapshot_timestamp
if doc = reply.documents.first
doc['cursor']&.[]('atClusterTime') || doc['atClusterTime']
end
end
private
def aggregate_returned_count
replies.reduce(0) do |n, reply|
n += reply.number_returned
n
end
end
def aggregate_written_count
documents.reduce(0) do |n, document|
n += (document[N] || 0)
n
end
end
def parser
@parser ||= Error::Parser.new(first_document, replies)
end
def first_document
@first_document ||= first || BSON::Document.new
end
def query_failure?
replies.first && (replies.first.query_failure? || replies.first.cursor_not_found?)
end
end
end
end