RocketChat/Rocket.Chat

View on GitHub
apps/meteor/app/notifications/server/lib/Presence.ts

Summary

Maintainability
A
0 mins
Test Coverage
import type { IUser } from '@rocket.chat/core-typings';
import type { StreamerEvents } from '@rocket.chat/ddp-client';
import { Emitter } from '@rocket.chat/emitter';
import type { IPublication, IStreamerConstructor, Connection, IStreamer } from 'meteor/rocketchat:streamer';

type UserPresenceStreamProps = {
    added: IUser['_id'][];
    removed: IUser['_id'][];
};

type UserPresenceStreamArgs = {
    uid: string;
    args: StreamerEvents['user-presence'][number]['args'];
};

const e = new Emitter<{
    [key: string]: UserPresenceStreamArgs;
}>();

const clients = new WeakMap<Connection, UserPresence>();

class UserPresence {
    private readonly streamer: IStreamer<'user-presence'>;

    private readonly publication: IPublication;

    private readonly listeners: Set<string>;

    constructor(publication: IPublication, streamer: IStreamer<'user-presence'>) {
        this.listeners = new Set();
        this.publication = publication;
        this.streamer = streamer;
    }

    listen(uid: string): void {
        if (this.listeners.has(uid)) {
            return;
        }
        e.on(uid, this.run);
        this.listeners.add(uid);
    }

    off = (uid: string): void => {
        e.off(uid, this.run);
        this.listeners.delete(uid);
    };

    run = (args: UserPresenceStreamArgs): void => {
        const payload = this.streamer.changedPayload(this.streamer.subscriptionName, args.uid, { ...args, eventName: args.uid }); // there is no good explanation to keep eventName, I just want to save one 'DDPCommon.parseDDP' on the client side, so I'm trying to fit the Meteor Streamer's payload
        if (payload) this.publication._session.socket?.send(payload);
    };

    stop(): void {
        this.listeners.forEach(this.off);
        clients.delete(this.publication.connection);
    }

    static getClient(publication: IPublication, streamer: IStreamer<'user-presence'>): [UserPresence, boolean] {
        const { connection } = publication;
        const stored = clients.get(connection);

        const client = stored || new UserPresence(publication, streamer);

        const main = Boolean(!stored);

        clients.set(connection, client);

        return [client, main];
    }
}

export class StreamPresence {
    // eslint-disable-next-line @typescript-eslint/naming-convention
    static getInstance(Streamer: IStreamerConstructor, name = 'user-presence'): IStreamer<'user-presence'> {
        return new (class StreamPresence extends Streamer<'user-presence'> {
            async _publish(
                publication: IPublication,
                _eventName: string,
                options: boolean | { useCollection?: boolean; args?: any } = false,
            ): Promise<void> {
                const { added, removed } = (typeof options !== 'boolean' ? options : {}) as unknown as UserPresenceStreamProps;

                const [client, main] = UserPresence.getClient(publication, this);

                added?.forEach((uid) => client.listen(uid));
                removed?.forEach((uid) => client.off(uid));

                if (!main) {
                    publication.stop();
                    return;
                }

                publication.ready();

                publication.onStop(() => client.stop());
            }
        } as any)(name);
    }
}

export const emit = (uid: string, args: UserPresenceStreamArgs['args']): void => {
    e.emit(uid, { uid, args });
};