docs/reference/change-streams.txt
.. _change-streams:
**************
Change Streams
**************
.. default-domain:: mongodb
.. contents:: On this page
:local:
:backlinks: none
:depth: 1
:class: singlecol
As of version 3.6 of the MongoDB server, a new ``$changeStream`` pipeline stage
is supported in the aggregation framework. Specifying this stage first in an
aggregation pipeline allows users to request that notifications are sent for all
changes to a particular collection. As of MongoDB 4.0, change streams are
supported on databases and clusters in addition to collections.
The Ruby driver provides an API for
receiving notifications for changes to a particular collection, database
or cluster using this
new pipeline stage. Although you can create a change stream using the pipeline
operator and aggregation framework directly, it is recommended to use the
driver API described below as the driver resumes the change stream one time
if there is a timeout, a network error, a server error indicating that a
failover is taking place or another type of a resumable error.
Change streams on the server require a ``"majority"`` read concern or no
read concern.
Change streams do not work properly with JRuby because of the issue documented here_.
Namely, JRuby eagerly evaluates ``#next`` on an Enumerator in a background
green thread, therefore calling ``#next`` on the change stream will cause
getMores to be called in a loop in the background.
.. _here: https://github.com/jruby/jruby/issues/4212
Watching for Changes on a Collection
====================================
A collection change stream is created by calling the ``#watch`` method on a
collection:
.. code-block:: ruby
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
collection = client[:test]
stream = collection.watch
collection.insert_one(a: 1)
doc = stream.to_enum.next
process(doc)
You can also receive the notifications as they become available:
.. code-block:: ruby
stream = collection.watch
enum = stream.to_enum
while doc = enum.next
process(doc)
end
The ``next`` method blocks and polls the cluster until a change is available.
Use the ``try_next`` method to iterate a change stream without blocking; this
method will wait up to max_await_time_ms milliseconds for changes from the server,
and if no changes are received it will return nil. If there is a non-resumable
error, both ``next`` and ``try_next`` will raise an exception.
See Resuming a Change Stream section below for an example that reads
changes from a collection indefinitely.
The change stream can take filters in the aggregation framework pipeline
operator format:
.. code-block:: ruby
stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } },
{'$match' => { 'fullDocument.n' => { '$gte' => 1 } } }
])
enum = stream.to_enum
while doc = enum.next
process(doc)
end
Watching for Changes on a Database
==================================
A database change stream notifies on changes on any collection within the
database as well as database-wide events, such as the database being dropped.
A database change stream is created by calling the ``#watch`` method on a
database object:
.. code-block:: ruby
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
database = client.database
stream = database.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)
Watching for Changes on a Cluster
=================================
A cluster change stream notifies on changes on any collection, any database
within the cluster as well as cluster-wide events.
A cluster change stream is created by calling the ``#watch`` method on a
client object (not the cluster object):
.. code-block:: ruby
client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
stream = client.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)
Closing a Change Stream
=======================
You can close a change stream by calling its ``#close`` method:
.. code-block:: ruby
stream.close
Resuming a Change Stream
========================
A change stream consists of two types of operations: the initial aggregation
and ``getMore`` requests to receive the next batch of changes.
The driver will automatically retry each ``getMore`` operation once on
network errors and when the server returns an error indicating it changed
state (for example, it is no longer the primary). The driver does not retry
the initial aggregation.
In practical terms this means that, for example:
- Calling ``collection.watch`` will fail if the cluster does not have
enough available nodes to satisfy the ``"majority"`` read preference.
- Once ``collection.watch`` successfully returns, if the cluster subsequently
experiences an election or loses a node, but heals quickly enough,
change stream reads via ``next`` or ``each`` methods will continue
transparently to the application.
To indefinitely and reliably watch for changes without losing any changes or
processing a change more than once, the application must track the resume
token for the change stream and restart the change stream when it experiences
extended error conditions that cause the driver's automatic resume to also
fail. The following code snippet shows an example of iterating a change stream
indefinitely, retrieving the resume token using the ``resume_token`` change
stream method and restarting the change stream using the ``:resume_after``
option on all MongoDB or network errors:
.. code-block:: ruby
token = nil
loop do
begin
stream = collection.watch([], resume_after: token)
enum = stream.to_enum
while doc = enum.next
process(doc)
token = stream.resume_token
end
rescue Mongo::Error
sleep 1
end
end
The above iteration is blocking at the ``enum.next`` call, and does not
permit resuming processing in the event the Ruby process running this code
is terminated. The driver also provides the ``try_next`` method which returns
``nil`` (after a small waiting period) instead of blocking indefinitely when
there are no changes in the change stream. Using the ``try_next`` method,
the resume token may be persisted after each ``getMore`` request, even when
a particular request does not return any changes, such that the resume token
remains at the top of the oplog and the application has an opportunity to
persist it should the process handling changes terminates:
.. code-block:: ruby
token = nil
loop do
begin
stream = collection.watch([], resume_after: token)
enum = stream.to_enum
doc = enum.try_next
if doc
process(doc)
end
token = stream.resume_token
# Persist +token+ to support resuming processing upon process restart
rescue Mongo::Error
sleep 1
end
end
Note that the resume token should be retrieved from the change stream after
every ``try_next`` call, even if the call returned no document.
The resume token is also provided in the ``_id`` field of each change stream
document. Reading the ``_id`` field is not recommended because it may be
projected out by the application, and because using only the ``_id`` field
would not advance the resume token when a ``getMore`` returns no documents.