lib/mongo/collection/view/change_stream.rb
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2017-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'mongo/collection/view/aggregation/behavior'
require 'mongo/collection/view/change_stream/retryable'
module Mongo
class Collection
class View
# Provides behavior around a `$changeStream` pipeline stage in the
# aggregation framework. Specifying this stage allows users to request
# that notifications are sent for all changes to a particular collection
# or database.
#
# @note Only available in server versions 3.6 and higher.
# @note ChangeStreams do not work properly with JRuby because of the
# issue documented here: https://github.com/jruby/jruby/issues/4212.
# Namely, JRuby eagerly evaluates #next on an Enumerator in a background
# green thread, therefore calling #next on the change stream will cause
# getMores to be called in a loop in the background.
#
#
# @since 2.5.0
class ChangeStream
include Aggregation::Behavior
include Retryable
# @return [ String ] The fullDocument option default value.
#
# @since 2.5.0
FULL_DOCUMENT_DEFAULT = 'default'.freeze
# @return [ Symbol ] Used to indicate that the change stream should listen for changes on
# the entire database rather than just the collection.
#
# @since 2.6.0
DATABASE = :database
# @return [ Symbol ] Used to indicate that the change stream should listen for changes on
# the entire cluster rather than just the collection.
#
# @since 2.6.0
CLUSTER = :cluster
# @return [ BSON::Document ] The change stream options.
#
# @since 2.5.0
attr_reader :options
# @return [ Cursor ] the underlying cursor for this operation
# @api private
attr_reader :cursor
# Initialize the change stream for the provided collection view, pipeline
# and options.
#
# @example Create the new change stream view.
# ChangeStream.new(view, pipeline, options)
#
# @param [ Collection::View ] view The collection view.
# @param [ Array<Hash> ] pipeline The pipeline of operators to filter the change notifications.
# @param [ Hash ] options The change stream options.
#
# @option options [ String ] :full_document Allowed values: nil, 'default',
# 'updateLookup', 'whenAvailable', 'required'.
#
# The default is to not send a value (i.e. nil), which is equivalent to
# 'default'. By default, the change notification for partial updates will
# include a delta describing the changes to the document.
#
# When set to 'updateLookup', the change notification for partial updates
# will include both a delta describing the changes to the document as well
# as a copy of the entire document that was changed from some time after
# the change occurred.
#
# When set to 'whenAvailable', configures the change stream to return the
# post-image of the modified document for replace and update change events
# if the post-image for this event is available.
#
# When set to 'required', the same behavior as 'whenAvailable' except that
# an error is raised if the post-image is not available.
# @option options [ String ] :full_document_before_change Allowed values: nil,
# 'whenAvailable', 'required', 'off'.
#
# The default is to not send a value (i.e. nil), which is equivalent to 'off'.
#
# When set to 'whenAvailable', configures the change stream to return the
# pre-image of the modified document for replace, update, and delete change
# events if it is available.
#
# When set to 'required', the same behavior as 'whenAvailable' except that
# an error is raised if the pre-image is not available.
# @option options [ BSON::Document, Hash ] :resume_after Specifies the logical starting point for the
# new change stream.
# @option options [ Integer ] :max_await_time_ms The maximum amount of time for the server to wait
# on new documents to satisfy a change stream query.
# @option options [ Integer ] :batch_size The number of documents to return per batch.
# @option options [ BSON::Document, Hash ] :collation The collation to use.
# @option options [ BSON::Timestamp ] :start_at_operation_time Only
# return changes that occurred at or after the specified timestamp. Any
# command run against the server will return a cluster time that can
# be used here. Only recognized by server versions 4.0+.
# @option options [ Bson::Document, Hash ] :start_after Similar to :resume_after, this
# option takes a resume token and starts a new change stream returning the first
# notification after the token. This will allow users to watch collections that have been
# dropped and recreated or newly renamed collections without missing any notifications.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
# @option options [ Boolean ] :show_expanded_events Enables the server to
# send the 'expanded' list of change stream events. The list of additional
# events included with this flag set are: createIndexes, dropIndexes,
# modify, create, shardCollection, reshardCollection,
# refineCollectionShardKey.
#
# The server will report an error if `startAfter` and `resumeAfter` are both specified.
#
# @since 2.5.0
def initialize(view, pipeline, changes_for, options = {})
# change stream cursors can only be :iterable, so we don't allow
# timeout_mode to be specified.
perform_setup(view, options, forbid: %i[ timeout_mode ]) do
@changes_for = changes_for
@change_stream_filters = pipeline && pipeline.dup
@start_after = @options[:start_after]
end
# The resume token tracked by the change stream, used only
# when there is no cursor, or no cursor resume token
@resume_token = @start_after || @options[:resume_after]
create_cursor!
# We send different parameters when we resume a change stream
# compared to when we send the first query
@resuming = true
end
# Iterate through documents returned by the change stream.
#
# This method retries once per error on resumable errors
# (two consecutive errors result in the second error being raised,
# an error which is recovered from resets the error count to zero).
#
# @example Iterate through the stream of documents.
# stream.each do |document|
# p document
# end
#
# @return [ Enumerator ] The enumerator.
#
# @since 2.5.0
#
# @yieldparam [ BSON::Document ] Each change stream document.
def each
raise StopIteration.new if closed?
loop do
document = try_next
yield document if document
end
rescue StopIteration
return self
end
# Return one document from the change stream, if one is available.
#
# Retries once on a resumable error.
#
# Raises StopIteration if the change stream is closed.
#
# This method will wait up to max_await_time_ms milliseconds
# for changes from the server, and if no changes are received
# it will return nil.
#
# @return [ BSON::Document | nil ] A change stream document.
# @since 2.6.0
def try_next
recreate_cursor! if @timed_out
raise StopIteration.new if closed?
begin
doc = @cursor.try_next
rescue Mongo::Error => e
# "If a next call fails with a timeout error, drivers MUST NOT
# invalidate the change stream. The subsequent next call MUST
# perform a resume attempt to establish a new change stream on the
# server..."
#
# However, SocketTimeoutErrors are TimeoutErrors, but are also
# change-stream-resumable. To preserve existing (specified) behavior,
# We only count timeouts when the error is not also
# change-stream-resumable.
@timed_out = e.is_a?(Mongo::Error::TimeoutError) && !e.change_stream_resumable?
raise unless @timed_out || e.change_stream_resumable?
@resume_token = @cursor.resume_token
raise e if @timed_out
recreate_cursor!(@cursor.context)
retry
end
# We need to verify each doc has an _id, so we
# have a resume token to work with
if doc && doc['_id'].nil?
raise Error::MissingResumeToken
end
doc
end
def to_enum
enum = super
enum.send(:instance_variable_set, '@obj', self)
class << enum
def try_next
@obj.try_next
end
end
enum
end
# Close the change stream.
#
# @example Close the change stream.
# stream.close
#
# @note This method attempts to close the cursor used by the change
# stream, which in turn closes the server-side change stream cursor.
# This method ignores any errors that occur when closing the
# server-side cursor.
#
# @params [ Hash ] opts Options to be passed to the cursor close
# command.
#
# @return [ nil ] Always nil.
#
# @since 2.5.0
def close(opts = {})
unless closed?
begin
@cursor.close(opts)
rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::MissingConnection
# ignore
end
@cursor = nil
end
end
# Is the change stream closed?
#
# @example Determine whether the change stream is closed.
# stream.closed?
#
# @return [ true, false ] If the change stream is closed.
#
# @since 2.5.0
def closed?
@cursor.nil?
end
# Get a formatted string for use in inspection.
#
# @example Inspect the change stream object.
# stream.inspect
#
# @return [ String ] The change stream inspection.
#
# @since 2.5.0
def inspect
"#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " +
"options=#{@options} resume_token=#{resume_token}>"
end
# Returns the resume token that the stream will
# use to automatically resume, if one exists.
#
# @example Get the change stream resume token.
# stream.resume_token
#
# @return [ BSON::Document | nil ] The change stream resume token.
#
# @since 2.10.0
def resume_token
cursor_resume_token = @cursor.resume_token if @cursor
cursor_resume_token || @resume_token
end
# "change streams are an abstraction around tailable-awaitData cursors..."
#
# @return :tailable_await
def cursor_type
:tailable_await
end
# "change streams...implicitly use ITERATION mode"
#
# @return :iteration
def timeout_mode
:iteration
end
# Returns the value of the max_await_time_ms option that was
# passed to this change stream.
#
# @return [ Integer | nil ] the max_await_time_ms value
def max_await_time_ms
options[:max_await_time_ms]
end
private
def for_cluster?
@changes_for == CLUSTER
end
def for_database?
@changes_for == DATABASE
end
def for_collection?
!for_cluster? && !for_database?
end
def create_cursor!(timeout_ms = nil)
# clear the cache because we may get a newer or an older server
# (rolling upgrades)
@start_at_operation_time_supported = nil
session = client.get_session(@options)
context = Operation::Context.new(client: client, session: session, view: self, operation_timeouts: timeout_ms ? { operation_timeout_ms: timeout_ms } : operation_timeouts)
start_at_operation_time = nil
start_at_operation_time_supported = nil
@cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server|
server.with_connection do |connection|
start_at_operation_time_supported = connection.description.server_version_gte?('4.0')
result = send_initial_query(connection, context)
if doc = result.replies.first && result.replies.first.documents.first
start_at_operation_time = doc['operationTime']
else
# The above may set @start_at_operation_time to nil
# if it was not in the document for some reason,
# for consistency set it to nil here as well.
# NB: since this block may be executed more than once, each
# execution must write to start_at_operation_time either way.
start_at_operation_time = nil
end
result
end
end
@start_at_operation_time = start_at_operation_time
@start_at_operation_time_supported = start_at_operation_time_supported
end
def pipeline
[{ '$changeStream' => change_doc }] + @change_stream_filters
end
def aggregate_spec(session, read_preference)
super(session, read_preference).tap do |spec|
spec[:selector][:aggregate] = 1 unless for_collection?
end
end
def change_doc
{}.tap do |doc|
if @options[:full_document]
doc[:fullDocument] = @options[:full_document]
end
if @options[:full_document_before_change]
doc[:fullDocumentBeforeChange] = @options[:full_document_before_change]
end
if @options.key?(:show_expanded_events)
doc[:showExpandedEvents] = @options[:show_expanded_events]
end
if resuming?
# We have a resume token once we retrieved any documents.
# However, if the first getMore fails and the user didn't pass
# a resume token we won't have a resume token to use.
# Use start_at_operation time in this case
if resume_token
# Spec says we need to remove both startAtOperationTime and startAfter if
# either was passed in by user, thus we won't forward them
doc[:resumeAfter] = resume_token
elsif @start_at_operation_time_supported && @start_at_operation_time
# It is crucial to check @start_at_operation_time_supported
# here - we may have switched to an older server that
# does not support operation times and therefore shouldn't
# try to send one to it!
#
# @start_at_operation_time is already a BSON::Timestamp
doc[:startAtOperationTime] = @start_at_operation_time
else
# Can't resume if we don't have either
raise Mongo::Error::MissingResumeToken
end
else
if @start_after
doc[:startAfter] = @start_after
elsif resume_token
doc[:resumeAfter] = resume_token
end
if options[:start_at_operation_time]
doc[:startAtOperationTime] = time_to_bson_timestamp(
options[:start_at_operation_time])
end
end
doc[:allChangesForCluster] = true if for_cluster?
end
end
def send_initial_query(connection, context)
initial_query_op(context.session, view.read_preference)
.execute_with_connection(
connection,
context: context,
)
end
def time_to_bson_timestamp(time)
if time.is_a?(Time)
seconds = time.to_f
BSON::Timestamp.new(seconds.to_i, ((seconds - seconds.to_i) * 1000000).to_i)
elsif time.is_a?(BSON::Timestamp)
time
else
raise ArgumentError, 'Time must be a Time or a BSON::Timestamp instance'
end
end
def resuming?
!!@resuming
end
# Recreates the current cursor (typically as a consequence of attempting
# to resume the change stream)
def recreate_cursor!(context = nil)
@timed_out = false
close
create_cursor!(context&.remaining_timeout_ms)
end
end
end
end
end