apps/meteor/server/database/DatabaseWatcher.ts
import EventEmitter from 'events';
import type { IRocketChatRecord } from '@rocket.chat/core-typings';
import type { Logger } from '@rocket.chat/logger';
import { escapeRegExp } from '@rocket.chat/string-helpers';
import type { Timestamp, Db, ChangeStreamDeleteDocument, ChangeStreamInsertDocument, ChangeStreamUpdateDocument } from 'mongodb';
import { MongoClient } from 'mongodb';
import { convertChangeStreamPayload } from './convertChangeStreamPayload';
import { convertOplogPayload } from './convertOplogPayload';
import { getWatchCollections } from './watchCollections';
const instancePing = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10000;
const maxDocMs = instancePing * 4; // 4 times the ping interval
export type RealTimeData<T> = {
id: string;
action: 'insert' | 'update' | 'remove';
clientAction: 'inserted' | 'updated' | 'removed';
data?: T;
diff?: Record<string, any>;
unset?: Record<string, number>;
oplog?: true;
};
const ignoreChangeStream = ['yes', 'true'].includes(String(process.env.IGNORE_CHANGE_STREAM).toLowerCase());
const useMeteorOplog = ['yes', 'true'].includes(String(process.env.USE_NATIVE_OPLOG).toLowerCase());
const useFullDocument = ['yes', 'true'].includes(String(process.env.CHANGESTREAM_FULL_DOCUMENT).toLowerCase());
export class DatabaseWatcher extends EventEmitter {
private db: Db;
private _oplogHandle?: any;
private metrics?: any;
private logger: Logger;
private resumeRetryCount = 0;
/**
* Last doc timestamp received from a real time event
*/
private lastDocTS: Date;
private watchCollections: string[];
// eslint-disable-next-line @typescript-eslint/naming-convention
constructor({ db, _oplogHandle, metrics, logger: LoggerClass }: { db: Db; _oplogHandle?: any; metrics?: any; logger: typeof Logger }) {
super();
this.db = db;
this._oplogHandle = _oplogHandle;
this.metrics = metrics;
this.logger = new LoggerClass('DatabaseWatcher');
}
async watch(): Promise<void> {
this.watchCollections = getWatchCollections();
if (useMeteorOplog) {
// TODO remove this when updating to Meteor 2.8
this.logger.warn(
'Using USE_NATIVE_OPLOG=true is currently discouraged due to known performance issues. Please use IGNORE_CHANGE_STREAM=true instead.',
);
this.watchMeteorOplog();
return;
}
if (ignoreChangeStream) {
await this.watchOplog();
return;
}
try {
this.watchChangeStream();
} catch (err: unknown) {
await this.watchOplog();
}
}
private async watchOplog(): Promise<void> {
if (!process.env.MONGO_OPLOG_URL) {
throw Error('No $MONGO_OPLOG_URL provided');
}
const isMasterDoc = await this.db.admin().command({ ismaster: 1 });
if (!isMasterDoc?.setName) {
throw Error("$MONGO_URL should be a replica set's URL");
}
const dbName = this.db.databaseName;
const client = new MongoClient(process.env.MONGO_OPLOG_URL, {
maxPoolSize: 1,
});
if (client.db().databaseName !== 'local') {
throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set");
}
await client.connect();
this.logger.startup('Using oplog');
const db = client.db();
const oplogCollection = db.collection('oplog.rs');
const lastOplogEntry = await oplogCollection.findOne<{ ts: Timestamp }>({}, { sort: { $natural: -1 }, projection: { _id: 0, ts: 1 } });
const oplogSelector = {
ns: new RegExp(`^(?:${[escapeRegExp(`${dbName}.`)].join('|')})`),
op: { $in: ['i', 'u', 'd'] },
...(lastOplogEntry && { ts: { $gt: lastOplogEntry.ts } }),
};
const cursor = oplogCollection.find(oplogSelector);
cursor.addCursorFlag('tailable', true);
cursor.addCursorFlag('awaitData', true);
cursor.addCursorFlag('oplogReplay', true);
const stream = cursor.stream();
stream.on('data', (doc) => {
const doesMatter = this.watchCollections.some((collection) => doc.ns === `${dbName}.${collection}`);
if (!doesMatter) {
return;
}
this.emitDoc(
doc.ns.slice(dbName.length + 1),
convertOplogPayload({
id: doc.op === 'u' ? doc.o2._id : doc.o._id,
op: doc,
}),
);
});
}
private watchMeteorOplog(): void {
if (!this._oplogHandle) {
throw new Error('no-oplog-handle');
}
this.logger.startup('Using Meteor oplog');
this.watchCollections.forEach((collection) => {
this._oplogHandle.onOplogEntry({ collection }, (event: any) => {
this.emitDoc(collection, convertOplogPayload(event));
});
});
}
private watchChangeStream(resumeToken?: unknown): void {
try {
const options = {
...(useFullDocument ? { fullDocument: 'updateLookup' } : {}),
...(resumeToken ? { startAfter: resumeToken } : {}),
};
let lastEvent: unknown;
const changeStream = this.db.watch<
IRocketChatRecord,
| ChangeStreamInsertDocument<IRocketChatRecord>
| ChangeStreamUpdateDocument<IRocketChatRecord>
| ChangeStreamDeleteDocument<IRocketChatRecord>
>(
[
{
$match: {
'operationType': { $in: ['insert', 'update', 'delete'] },
'ns.coll': { $in: this.watchCollections },
},
},
],
options,
);
changeStream.on('change', (event) => {
// reset retry counter
this.resumeRetryCount = 0;
// save last event to resume on error
lastEvent = event._id;
this.emitDoc(event.ns.coll, convertChangeStreamPayload(event));
});
changeStream.on('error', (err) => {
if (this.resumeRetryCount++ < 5) {
this.logger.warn({ msg: `Change stream error. Trying resume after ${this.resumeRetryCount} seconds.`, err });
setTimeout(() => {
this.watchChangeStream(lastEvent);
}, this.resumeRetryCount * 1000);
return;
}
throw err;
});
this.logger.startup('Using change streams');
} catch (err: unknown) {
this.logger.fatal({ msg: 'Cannot resume change stream.', err });
}
}
private emitDoc(collection: string, doc: RealTimeData<IRocketChatRecord> | void): void {
if (!doc) {
return;
}
this.lastDocTS = new Date();
this.metrics?.oplog.inc({
collection,
op: doc.action,
});
this.emit(collection, doc);
}
on<T>(collection: string, callback: (event: RealTimeData<T>) => void): this {
return super.on(collection, callback);
}
/**
* @returns the last timestamp delta in miliseconds received from a real time event
*/
getLastDocDelta(): number {
return this.lastDocTS ? Date.now() - this.lastDocTS.getTime() : Infinity;
}
/**
* @returns Indicates if the last document received is older than it should be. If that happens, it means that the oplog is not working properly
*/
isLastDocDelayed(): boolean {
return this.getLastDocDelta() > maxDocMs;
}
}