RocketChat/Rocket.Chat

View on GitHub
apps/meteor/server/database/DatabaseWatcher.ts

Summary

Maintainability
A
3 hrs
Test Coverage
import EventEmitter from 'events';

import type { IRocketChatRecord } from '@rocket.chat/core-typings';
import type { Logger } from '@rocket.chat/logger';
import { escapeRegExp } from '@rocket.chat/string-helpers';
import type { Timestamp, Db, ChangeStreamDeleteDocument, ChangeStreamInsertDocument, ChangeStreamUpdateDocument } from 'mongodb';
import { MongoClient } from 'mongodb';

import { convertChangeStreamPayload } from './convertChangeStreamPayload';
import { convertOplogPayload } from './convertOplogPayload';
import { getWatchCollections } from './watchCollections';

const instancePing = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10000;

const maxDocMs = instancePing * 4; // 4 times the ping interval

export type RealTimeData<T> = {
    id: string;
    action: 'insert' | 'update' | 'remove';
    clientAction: 'inserted' | 'updated' | 'removed';
    data?: T;
    diff?: Record<string, any>;
    unset?: Record<string, number>;
    oplog?: true;
};

const ignoreChangeStream = ['yes', 'true'].includes(String(process.env.IGNORE_CHANGE_STREAM).toLowerCase());

const useMeteorOplog = ['yes', 'true'].includes(String(process.env.USE_NATIVE_OPLOG).toLowerCase());

const useFullDocument = ['yes', 'true'].includes(String(process.env.CHANGESTREAM_FULL_DOCUMENT).toLowerCase());

export class DatabaseWatcher extends EventEmitter {
    private db: Db;

    private _oplogHandle?: any;

    private metrics?: any;

    private logger: Logger;

    private resumeRetryCount = 0;

    /**
     * Last doc timestamp received from a real time event
     */
    private lastDocTS: Date;

    private watchCollections: string[];

    // eslint-disable-next-line @typescript-eslint/naming-convention
    constructor({ db, _oplogHandle, metrics, logger: LoggerClass }: { db: Db; _oplogHandle?: any; metrics?: any; logger: typeof Logger }) {
        super();

        this.db = db;
        this._oplogHandle = _oplogHandle;
        this.metrics = metrics;
        this.logger = new LoggerClass('DatabaseWatcher');
    }

    async watch(): Promise<void> {
        this.watchCollections = getWatchCollections();

        if (useMeteorOplog) {
            // TODO remove this when updating to Meteor 2.8
            this.logger.warn(
                'Using USE_NATIVE_OPLOG=true is currently discouraged due to known performance issues. Please use IGNORE_CHANGE_STREAM=true instead.',
            );
            this.watchMeteorOplog();
            return;
        }

        if (ignoreChangeStream) {
            await this.watchOplog();
            return;
        }

        try {
            this.watchChangeStream();
        } catch (err: unknown) {
            await this.watchOplog();
        }
    }

    private async watchOplog(): Promise<void> {
        if (!process.env.MONGO_OPLOG_URL) {
            throw Error('No $MONGO_OPLOG_URL provided');
        }

        const isMasterDoc = await this.db.admin().command({ ismaster: 1 });
        if (!isMasterDoc?.setName) {
            throw Error("$MONGO_URL should be a replica set's URL");
        }

        const dbName = this.db.databaseName;

        const client = new MongoClient(process.env.MONGO_OPLOG_URL, {
            maxPoolSize: 1,
        });

        if (client.db().databaseName !== 'local') {
            throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set");
        }

        await client.connect();

        this.logger.startup('Using oplog');

        const db = client.db();

        const oplogCollection = db.collection('oplog.rs');

        const lastOplogEntry = await oplogCollection.findOne<{ ts: Timestamp }>({}, { sort: { $natural: -1 }, projection: { _id: 0, ts: 1 } });

        const oplogSelector = {
            ns: new RegExp(`^(?:${[escapeRegExp(`${dbName}.`)].join('|')})`),
            op: { $in: ['i', 'u', 'd'] },
            ...(lastOplogEntry && { ts: { $gt: lastOplogEntry.ts } }),
        };

        const cursor = oplogCollection.find(oplogSelector);

        cursor.addCursorFlag('tailable', true);
        cursor.addCursorFlag('awaitData', true);
        cursor.addCursorFlag('oplogReplay', true);

        const stream = cursor.stream();

        stream.on('data', (doc) => {
            const doesMatter = this.watchCollections.some((collection) => doc.ns === `${dbName}.${collection}`);
            if (!doesMatter) {
                return;
            }

            this.emitDoc(
                doc.ns.slice(dbName.length + 1),
                convertOplogPayload({
                    id: doc.op === 'u' ? doc.o2._id : doc.o._id,
                    op: doc,
                }),
            );
        });
    }

    private watchMeteorOplog(): void {
        if (!this._oplogHandle) {
            throw new Error('no-oplog-handle');
        }

        this.logger.startup('Using Meteor oplog');

        this.watchCollections.forEach((collection) => {
            this._oplogHandle.onOplogEntry({ collection }, (event: any) => {
                this.emitDoc(collection, convertOplogPayload(event));
            });
        });
    }

    private watchChangeStream(resumeToken?: unknown): void {
        try {
            const options = {
                ...(useFullDocument ? { fullDocument: 'updateLookup' } : {}),
                ...(resumeToken ? { startAfter: resumeToken } : {}),
            };

            let lastEvent: unknown;

            const changeStream = this.db.watch<
                IRocketChatRecord,
                | ChangeStreamInsertDocument<IRocketChatRecord>
                | ChangeStreamUpdateDocument<IRocketChatRecord>
                | ChangeStreamDeleteDocument<IRocketChatRecord>
            >(
                [
                    {
                        $match: {
                            'operationType': { $in: ['insert', 'update', 'delete'] },
                            'ns.coll': { $in: this.watchCollections },
                        },
                    },
                ],
                options,
            );
            changeStream.on('change', (event) => {
                // reset retry counter
                this.resumeRetryCount = 0;

                // save last event to resume on error
                lastEvent = event._id;

                this.emitDoc(event.ns.coll, convertChangeStreamPayload(event));
            });

            changeStream.on('error', (err) => {
                if (this.resumeRetryCount++ < 5) {
                    this.logger.warn({ msg: `Change stream error. Trying resume after ${this.resumeRetryCount} seconds.`, err });

                    setTimeout(() => {
                        this.watchChangeStream(lastEvent);
                    }, this.resumeRetryCount * 1000);

                    return;
                }

                throw err;
            });

            this.logger.startup('Using change streams');
        } catch (err: unknown) {
            this.logger.fatal({ msg: 'Cannot resume change stream.', err });
        }
    }

    private emitDoc(collection: string, doc: RealTimeData<IRocketChatRecord> | void): void {
        if (!doc) {
            return;
        }

        this.lastDocTS = new Date();

        this.metrics?.oplog.inc({
            collection,
            op: doc.action,
        });

        this.emit(collection, doc);
    }

    on<T>(collection: string, callback: (event: RealTimeData<T>) => void): this {
        return super.on(collection, callback);
    }

    /**
     * @returns the last timestamp delta in miliseconds received from a real time event
     */
    getLastDocDelta(): number {
        return this.lastDocTS ? Date.now() - this.lastDocTS.getTime() : Infinity;
    }

    /**
     * @returns Indicates if the last document received is older than it should be. If that happens, it means that the oplog is not working properly
     */
    isLastDocDelayed(): boolean {
        return this.getLastDocDelta() > maxDocMs;
    }
}