RocketChat/Rocket.Chat

View on GitHub
ee/apps/ddp-streamer/src/Server.ts

Summary

Maintainability
B
6 hrs
Test Coverage
import { EventEmitter } from 'events';

import type { IServiceMetrics } from '@rocket.chat/core-services';
import { MeteorService, isMeteorError, MeteorError } from '@rocket.chat/core-services';
import { Logger } from '@rocket.chat/logger';
import ejson from 'ejson';
import { v1 as uuidv1 } from 'uuid';
import WebSocket from 'ws';

import type { Client } from './Client';
import { Publication } from './Publication';
import { DDP_EVENTS } from './constants';
import type { IPacket } from './types/IPacket';

const logger = new Logger('DDP-Streamer');

type SubscriptionFn = (this: Publication, eventName: string, options: object) => void;
type MethodFn = (this: Client, ...args: any[]) => any;
type Methods = {
    [k: string]: MethodFn;
};

const handleInternalException = (err: unknown, msg: string): MeteorError => {
    if (err instanceof MeteorError) {
        return err;
    }

    // default errors are logged to the console and redacted from the client
    // TODO switch to using the logger (ideally broker.logger)
    logger.error({ msg, err });

    return new MeteorError(500, 'Internal server error');
};

export const SERVER_ID = ejson.stringify({ server_id: '0' });

export class Server extends EventEmitter {
    private _subscriptions = new Map<string, SubscriptionFn>();

    private _methods = new Map<string, MethodFn>();

    private metrics?: IServiceMetrics;

    public readonly id = uuidv1();

    serialize = ejson.stringify;

    parse = (data: WebSocket.Data, isBinary: boolean): IPacket => {
        if (isBinary) {
            throw new MeteorError(500, 'Binary data not supported');
        }
        const packet = data.toString();

        const payload = packet.startsWith('[') ? JSON.parse(packet)[0] : packet;
        return ejson.parse(payload);
    };

    setMetrics(metrics: IServiceMetrics): void {
        this.metrics = metrics;
    }

    async call(client: Client, packet: IPacket): Promise<void> {
        // if client is not connected we don't need to do anything
        if (client.ws.readyState !== WebSocket.OPEN) {
            return;
        }
        try {
            // if method was not defined on DDP Streamer we fall back to Meteor
            if (!this._methods.has(packet.method)) {
                const result = await MeteorService.callMethodWithToken(client.userId, client.userToken, packet.method, packet.params);
                if (result?.result) {
                    return this.result(client, packet, result.result);
                }

                throw new MeteorError(404, `Method '${packet.method}' not found`);
            }

            const fn = this._methods.get(packet.method);
            if (!fn) {
                throw new MeteorError(404, `Method '${packet.method}' not found`);
            }

            const result = await fn.apply(client, packet.params);
            return this.result(client, packet, result);
        } catch (err: unknown) {
            return this.result(client, packet, null, handleInternalException(err, 'Method call error'));
        }
    }

    methods(obj: Methods): void {
        Object.entries(obj).forEach(([name, fn]) => {
            if (this._methods.has(name)) {
                return;
            }
            this._methods.set(name, fn);
        });
    }

    async subscribe(client: Client, packet: IPacket): Promise<void> {
        // if client is not connected we don't need to do anything
        if (client.ws.readyState !== WebSocket.OPEN) {
            return;
        }
        try {
            if (!this._subscriptions.has(packet.name)) {
                throw new MeteorError(404, `Subscription '${packet.name}' not found`);
            }
            const fn = this._subscriptions.get(packet.name);
            if (!fn) {
                throw new MeteorError(404, `Subscription '${packet.name}' not found`);
            }

            const end = this.metrics?.timer('rocketchat_subscription', { subscription: packet.name });

            const publication = new Publication(client, packet, this);
            const [eventName, options] = packet.params;
            await fn.call(publication, eventName, options);

            end?.();
        } catch (err: unknown) {
            return this.nosub(client, packet, handleInternalException(err, 'Subscription error'));
        }
    }

    publish(name: string, fn: SubscriptionFn): void {
        if (this._subscriptions.has(name)) {
            return;
        }
        this._subscriptions.set(name, fn);
    }

    stream(stream: string, fn: SubscriptionFn): void {
        return this.publish(`stream-${stream}`, fn);
    }

    result(client: Client, { id }: IPacket, result?: any, error?: Error | MeteorError): void {
        client.send(
            this.serialize({
                [DDP_EVENTS.MSG]: DDP_EVENTS.RESULT,
                id,
                ...(result && { result }),
                ...(error && { error: isMeteorError(error) ? error.toJSON() : error }),
            }),
        );
        return client.send(
            this.serialize({
                [DDP_EVENTS.MSG]: DDP_EVENTS.UPDATED,
                [DDP_EVENTS.METHODS]: [id],
            }),
        );
    }

    nosub(client: Client, { id }: IPacket, error?: Error | MeteorError): void {
        return client.send(
            this.serialize({
                [DDP_EVENTS.MSG]: DDP_EVENTS.NO_SUBSCRIBE,
                id,
                ...(error && { error: isMeteorError(error) ? error.toJSON() : error }),
            }),
        );
    }

    ready(client: Client, packet: IPacket): void {
        return client.send(
            this.serialize({
                [DDP_EVENTS.MSG]: DDP_EVENTS.READY,
                [DDP_EVENTS.SUBSCRIPTIONS]: [packet.id],
            }),
        );
    }

    added(client: Client, collection: string, id: string, fields: any): void {
        return client.send(
            this.serialize({
                [DDP_EVENTS.MSG]: DDP_EVENTS.ADDED,
                [DDP_EVENTS.COLLECTION]: collection,
                [DDP_EVENTS.ID]: id,
                [DDP_EVENTS.FIELDS]: fields,
            }),
        );
    }

    changed(client: Client, collection: string, id: string, fields: any): void {
        return client.send(
            this.serialize({
                [DDP_EVENTS.MSG]: DDP_EVENTS.CHANGED,
                [DDP_EVENTS.COLLECTION]: collection,
                [DDP_EVENTS.ID]: id,
                [DDP_EVENTS.FIELDS]: fields,
            }),
        );
    }

    removed(client: Client, collection: string, id: string): void {
        return client.send(
            this.serialize({
                [DDP_EVENTS.MSG]: DDP_EVENTS.REMOVED,
                [DDP_EVENTS.COLLECTION]: collection,
                [DDP_EVENTS.ID]: id,
            }),
        );
    }
}