RocketChat/Rocket.Chat

View on GitHub
apps/meteor/app/notification-queue/server/NotificationQueue.ts

Summary

Maintainability
A
3 hrs
Test Coverage
import type { INotification, INotificationItemPush, INotificationItemEmail, NotificationItem, IUser } from '@rocket.chat/core-typings';
import { NotificationQueue, Users } from '@rocket.chat/models';
import { Meteor } from 'meteor/meteor';

import { SystemLogger } from '../../../server/lib/logger/system';
import { sendEmailFromData } from '../../lib/server/functions/notifications/email';
import { PushNotification } from '../../push-notifications/server';

const {
    NOTIFICATIONS_WORKER_TIMEOUT = 2000,
    NOTIFICATIONS_BATCH_SIZE = 100,
    NOTIFICATIONS_SCHEDULE_DELAY_ONLINE = 120,
    NOTIFICATIONS_SCHEDULE_DELAY_AWAY = 0,
    NOTIFICATIONS_SCHEDULE_DELAY_OFFLINE = 0,
} = process.env;

class NotificationClass {
    private running = false;

    private cyclePause = Number(NOTIFICATIONS_WORKER_TIMEOUT);

    private maxBatchSize = Number(NOTIFICATIONS_BATCH_SIZE);

    private maxScheduleDelaySeconds: { [key: string]: number } = {
        online: Number(NOTIFICATIONS_SCHEDULE_DELAY_ONLINE),
        away: Number(NOTIFICATIONS_SCHEDULE_DELAY_AWAY),
        offline: Number(NOTIFICATIONS_SCHEDULE_DELAY_OFFLINE),
    };

    initWorker(): void {
        this.running = true;
        this.executeWorkerLater();
    }

    stopWorker(): void {
        this.running = false;
    }

    executeWorkerLater(): void {
        if (!this.running) {
            return;
        }

        setTimeout(async () => {
            try {
                await this.worker();
            } catch (err) {
                SystemLogger.error({ msg: 'Error sending notification', err });
                this.executeWorkerLater();
            }
        }, this.cyclePause);
    }

    async worker(counter = 0): Promise<void> {
        const notification = await this.getNextNotification();

        if (!notification) {
            return this.executeWorkerLater();
        }

        // Once we start notifying the user we anticipate all the schedules
        const flush = await NotificationQueue.clearScheduleByUserId(notification.uid);

        // start worker again it queue flushed
        if (flush.modifiedCount) {
            await NotificationQueue.unsetSendingById(notification._id);
            return this.worker(counter);
        }

        try {
            for await (const item of notification.items) {
                switch (item.type) {
                    case 'push':
                        await this.push(notification, item);
                        break;
                    case 'email':
                        await this.email(item);
                        break;
                }
            }

            await NotificationQueue.removeById(notification._id);
        } catch (e) {
            SystemLogger.error(e);
            await NotificationQueue.setErrorById(notification._id, e instanceof Error ? e.message : String(e));
        }

        if (counter >= this.maxBatchSize) {
            return this.executeWorkerLater();
        }
        await this.worker(counter++);
    }

    getNextNotification(): Promise<INotification | null> {
        const expired = new Date();
        expired.setMinutes(expired.getMinutes() - 5);

        return NotificationQueue.findNextInQueueOrExpired(expired);
    }

    async push({ uid, rid, mid }: INotification, item: INotificationItemPush): Promise<void> {
        await PushNotification.send({
            rid,
            uid,
            mid,
            ...item.data,
        });
    }

    async email(item: INotificationItemEmail): Promise<void> {
        return sendEmailFromData(item.data);
    }

    async scheduleItem({
        uid,
        rid,
        mid,
        items,
        user,
    }: {
        uid: string;
        rid: string;
        mid: string;
        items: NotificationItem[];
        user?: Partial<IUser>;
    }): Promise<void> {
        const receiver =
            user ||
            (await Users.findOneById<Pick<IUser, 'statusConnection'>>(uid, {
                projection: {
                    statusConnection: 1,
                },
            }));

        if (!receiver) {
            return;
        }

        const { statusConnection = 'offline' } = receiver;

        let schedule: Date | undefined;

        const delay = this.maxScheduleDelaySeconds[statusConnection];

        if (delay < 0) {
            return;
        }
        if (delay > 0) {
            schedule = new Date();
            schedule.setSeconds(schedule.getSeconds() + delay);
        }

        await NotificationQueue.insertOne({
            uid,
            rid,
            mid,
            ts: new Date(),
            schedule,
            items,
        });
    }
}

export const Notification = new NotificationClass();

Meteor.startup(() => {
    Notification.initWorker();
});