mongodb/mongo-ruby-driver

View on GitHub
docs/reference/change-streams.txt

Summary

Maintainability
Test Coverage
.. _change-streams:

**************
Change Streams
**************

.. default-domain:: mongodb

.. contents:: On this page
   :local:
   :backlinks: none
   :depth: 1
   :class: singlecol

As of version 3.6 of the MongoDB server, a new ``$changeStream`` pipeline stage
is supported in the aggregation framework. Specifying this stage first in an
aggregation pipeline allows users to request that notifications are sent for all
changes to a particular collection. As of MongoDB 4.0, change streams are
supported on databases and clusters in addition to collections.

The Ruby driver provides an API for
receiving notifications for changes to a particular collection, database
or cluster using this
new pipeline stage. Although you can create a change stream using the pipeline
operator and aggregation framework directly, it is recommended to use the
driver API described below as the driver resumes the change stream one time
if there is a timeout, a network error, a server error indicating that a
failover is taking place or another type of a resumable error.

Change streams on the server require a ``"majority"`` read concern or no
read concern.

Change streams do not work properly with JRuby because of the issue documented here_.
Namely, JRuby eagerly evaluates ``#next`` on an Enumerator in a background
green thread, therefore calling ``#next`` on the change stream will cause
getMores to be called in a loop in the background.

.. _here: https://github.com/jruby/jruby/issues/4212

Watching for Changes on a Collection
====================================

A collection change stream is created by calling the ``#watch`` method on a
collection:

.. code-block:: ruby

  client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
  collection = client[:test]
  stream = collection.watch
  collection.insert_one(a: 1)
  doc = stream.to_enum.next
  process(doc)


You can also receive the notifications as they become available:

.. code-block:: ruby

  stream = collection.watch
  enum = stream.to_enum
  while doc = enum.next
    process(doc)
  end

The ``next`` method blocks and polls the cluster until a change is available.
Use the ``try_next`` method to iterate a change stream without blocking; this 
method will wait up to max_await_time_ms milliseconds for changes from the server, 
and if no changes are received it will return nil. If there is a non-resumable 
error, both ``next`` and ``try_next`` will raise an exception.
See Resuming a Change Stream section below for an example that reads
changes from a collection indefinitely.

The change stream can take filters in the aggregation framework pipeline
operator format:

.. code-block:: ruby

  stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } },
                             {'$match' => { 'fullDocument.n' => { '$gte' => 1 } } }
                            ])
  enum = stream.to_enum
  while doc = enum.next
    process(doc)
  end

Watching for Changes on a Database
==================================

A database change stream notifies on changes on any collection within the
database as well as database-wide events, such as the database being dropped.

A database change stream is created by calling the ``#watch`` method on a
database object:

.. code-block:: ruby

  client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
  database = client.database
  stream = database.watch
  client[:test].insert_one(a: 1)
  doc = stream.to_enum.next
  process(doc)


Watching for Changes on a Cluster
=================================

A cluster change stream notifies on changes on any collection, any database
within the cluster as well as cluster-wide events.

A cluster change stream is created by calling the ``#watch`` method on a
client object (not the cluster object):

.. code-block:: ruby

  client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
  stream = client.watch
  client[:test].insert_one(a: 1)
  doc = stream.to_enum.next
  process(doc)


Closing a Change Stream
=======================

You can close a change stream by calling its ``#close`` method:

.. code-block:: ruby

  stream.close


Resuming a Change Stream
========================

A change stream consists of two types of operations: the initial aggregation
and ``getMore`` requests to receive the next batch of changes.

The driver will automatically retry each ``getMore`` operation once on
network errors and when the server returns an error indicating it changed
state (for example, it is no longer the primary). The driver does not retry
the initial aggregation.

In practical terms this means that, for example:

- Calling ``collection.watch`` will fail if the cluster does not have
  enough available nodes to satisfy the ``"majority"`` read preference.
- Once ``collection.watch`` successfully returns, if the cluster subsequently
  experiences an election or loses a node, but heals quickly enough,
  change stream reads via ``next`` or ``each`` methods will continue
  transparently to the application.

To indefinitely and reliably watch for changes without losing any changes or
processing a change more than once, the application must track the resume
token for the change stream and restart the change stream when it experiences
extended error conditions that cause the driver's automatic resume to also
fail. The following code snippet shows an example of iterating a change stream
indefinitely, retrieving the resume token using the ``resume_token`` change
stream method and restarting the change stream using the ``:resume_after``
option on all MongoDB or network errors:

.. code-block:: ruby

  token = nil
  loop do
    begin
      stream = collection.watch([], resume_after: token)
      enum = stream.to_enum
      while doc = enum.next
        process(doc)
        token = stream.resume_token
      end
    rescue Mongo::Error
      sleep 1
    end
  end

The above iteration is blocking at the ``enum.next`` call, and does not
permit resuming processing in the event the Ruby process running this code
is terminated. The driver also provides the ``try_next`` method which returns
``nil`` (after a small waiting period) instead of blocking indefinitely when
there are no changes in the change stream. Using the ``try_next`` method,
the resume token may be persisted after each ``getMore`` request, even when
a particular request does not return any changes, such that the resume token
remains at the top of the oplog and the application has an opportunity to
persist it should the process handling changes terminates:

.. code-block:: ruby

  token = nil
  loop do
    begin
      stream = collection.watch([], resume_after: token)
      enum = stream.to_enum
      doc = enum.try_next
      if doc
        process(doc)
      end
      token = stream.resume_token
      # Persist +token+ to support resuming processing upon process restart
    rescue Mongo::Error
      sleep 1
    end
  end

Note that the resume token should be retrieved from the change stream after
every ``try_next`` call, even if the call returned no document.

The resume token is also provided in the ``_id`` field of each change stream
document. Reading the ``_id`` field is not recommended because it may be
projected out by the application, and because using only the ``_id`` field
would not advance the resume token when a ``getMore`` returns no documents.