RocketChat/Rocket.Chat

View on GitHub
ee/apps/ddp-streamer/src/Client.ts

Summary

Maintainability
D
2 days
Test Coverage
import { EventEmitter } from 'events';
import type { IncomingMessage } from 'http';

import type { ISocketConnection } from '@rocket.chat/core-typings';
import { v1 as uuidv1 } from 'uuid';
import type WebSocket from 'ws';

import { SERVER_ID } from './Server';
import { server } from './configureServer';
import { DDP_EVENTS, WS_ERRORS, WS_ERRORS_MESSAGES, TIMEOUT } from './constants';
import type { IPacket } from './types/IPacket';

// TODO why localhost not as 127.0.0.1?
// based on Meteor's implementation (link)
const getClientAddress = (req: IncomingMessage): string | undefined => {
    // For the reported client address for a connection to be correct,
    // the developer must set the HTTP_FORWARDED_COUNT environment
    // variable to an integer representing the number of hops they
    // expect in the `x-forwarded-for` header. E.g., set to "1" if the
    // server is behind one proxy.
    //
    // This could be computed once at startup instead of every time.
    const httpForwardedCount = parseInt(process.env.HTTP_FORWARDED_COUNT || '') || 0;

    if (httpForwardedCount === 0) {
        return req.socket.remoteAddress;
    }

    const forwardedFor =
        (req.headers['x-forwarded-for'] && Array.isArray(req.headers['x-forwarded-for'])
            ? req.headers['x-forwarded-for'][0]
            : req.headers['x-forwarded-for']) || '';
    if (!forwardedFor) {
        return;
    }
    const forwardedForClean = forwardedFor
        .trim()
        .split(',')
        .map((ip) => ip.trim());

    // Typically the first value in the `x-forwarded-for` header is
    // the original IP address of the client connecting to the first
    // proxy.  However, the end user can easily spoof the header, in
    // which case the first value(s) will be the fake IP address from
    // the user pretending to be a proxy reporting the original IP
    // address value.  By counting HTTP_FORWARDED_COUNT back from the
    // end of the list, we ensure that we get the IP address being
    // reported by *our* first proxy.

    if (httpForwardedCount < 0 || httpForwardedCount > forwardedForClean.length) {
        return;
    }

    return forwardedForClean[forwardedForClean.length - httpForwardedCount];
};

export const clientMap = new WeakMap<WebSocket, Client>();

export class Client extends EventEmitter {
    private chain = Promise.resolve();

    protected timeout: NodeJS.Timeout;

    public readonly session = uuidv1();

    public subscriptions = new Map();

    public connection: ISocketConnection;

    public wait = false;

    public userId?: string;

    public userToken?: string;

    constructor(public ws: WebSocket, public meteorClient = false, req: IncomingMessage) {
        super();

        this.connection = {
            id: this.session,
            instanceId: server.id,
            onClose: (fn): void => {
                this.on('close', fn);
            },
            clientAddress: getClientAddress(req),
            httpHeaders: req.headers,
        };

        this.renewTimeout(TIMEOUT / 1000);
        this.ws.on('message', this.handler);
        this.ws.on('close', (...args) => {
            server.emit(DDP_EVENTS.DISCONNECTED, this);
            this.emit('close', ...args);
            this.subscriptions.clear();
            clearTimeout(this.timeout);
        });

        this.ws.on('error', (err) => {
            console.error('Unexpected error:', err);
            this.ws.close(WS_ERRORS.CLOSE_PROTOCOL_ERROR, WS_ERRORS_MESSAGES.CLOSE_PROTOCOL_ERROR);
        });

        this.setMaxListeners(50);

        this.greeting();

        server.emit(DDP_EVENTS.CONNECTED, this);

        this.ws.on('message', () => this.renewTimeout(TIMEOUT));

        this.once('message', ({ msg }) => {
            if (msg !== DDP_EVENTS.CONNECT) {
                return this.ws.close(WS_ERRORS.CLOSE_PROTOCOL_ERROR, WS_ERRORS_MESSAGES.CLOSE_PROTOCOL_ERROR);
            }
            return this.send(server.serialize({ [DDP_EVENTS.MSG]: DDP_EVENTS.CONNECTED, session: this.session }));
        });

        this.send(SERVER_ID);

        clientMap.set(ws, this);
    }

    greeting(): void {
        // no greeting by default
        if (this.meteorClient) {
            return this.ws.send('o');
        }
    }

    async callMethod(packet: IPacket): Promise<void> {
        this.chain = this.chain.then(() => server.call(this, packet)).catch();
    }

    async callSubscribe(packet: IPacket): Promise<void> {
        this.chain = this.chain.then(() => server.subscribe(this, packet)).catch();
    }

    process(action: string, packet: IPacket): void {
        switch (action) {
            case DDP_EVENTS.PING:
                this.pong(packet.id);
                break;
            case DDP_EVENTS.METHOD:
                if (!packet.method) {
                    return this.ws.close(WS_ERRORS.CLOSE_PROTOCOL_ERROR);
                }
                if (!packet.id) {
                    return this.ws.close(WS_ERRORS.CLOSE_PROTOCOL_ERROR);
                }
                this.callMethod(packet);
                break;
            case DDP_EVENTS.SUBSCRIBE:
                if (!packet.name) {
                    return this.ws.close(WS_ERRORS.CLOSE_PROTOCOL_ERROR);
                }
                if (!packet.id) {
                    return this.ws.close(WS_ERRORS.CLOSE_PROTOCOL_ERROR);
                }
                this.callSubscribe(packet);
                break;
            case DDP_EVENTS.UNSUBSCRIBE:
                if (!packet.id) {
                    return this.ws.close(WS_ERRORS.CLOSE_PROTOCOL_ERROR);
                }
                const subscription = this.subscriptions.get(packet.id);
                if (!subscription) {
                    return;
                }
                subscription.stop();
                break;
        }
    }

    closeTimeout = (): void => {
        this.ws.close(WS_ERRORS.TIMEOUT, WS_ERRORS_MESSAGES.TIMEOUT);
    };

    ping(id?: string): void {
        this.send(server.serialize({ [DDP_EVENTS.MSG]: DDP_EVENTS.PING, ...(id && { [DDP_EVENTS.ID]: id }) }));
    }

    pong(id?: string): void {
        this.send(server.serialize({ [DDP_EVENTS.MSG]: DDP_EVENTS.PONG, ...(id && { [DDP_EVENTS.ID]: id }) }));
    }

    handleIdle = (): void => {
        this.ping();
        this.timeout = setTimeout(this.closeTimeout, TIMEOUT);
    };

    renewTimeout(timeout = TIMEOUT): void {
        clearTimeout(this.timeout);
        this.timeout = setTimeout(this.handleIdle, timeout);
    }

    handler = async (payload: WebSocket.Data, isBinary: boolean): Promise<void> => {
        try {
            const packet = server.parse(payload, isBinary);
            this.emit('message', packet);
            if (this.wait) {
                return new Promise((resolve) => this.once(DDP_EVENTS.LOGGED, () => resolve(this.process(packet.msg, packet))));
            }
            this.process(packet.msg, packet);
        } catch (err) {
            console.error(err);
            return this.ws.close(WS_ERRORS.UNSUPPORTED_DATA, WS_ERRORS_MESSAGES.UNSUPPORTED_DATA);
        }
    };

    encodePayload(payload: string): string {
        if (this.meteorClient) {
            return `a${JSON.stringify([payload])}`;
        }
        return payload;
    }

    send(payload: string): void {
        return this.ws.send(this.encodePayload(payload));
    }
}