RocketChat/Rocket.Chat

View on GitHub
packages/ddp-client/__tests__/DDPSDK.spec.ts

Summary

Maintainability
A
0 mins
Test Coverage
/* eslint-disable no-debugger */
import util from 'util';

import WS from 'jest-websocket-mock';
import { WebSocket } from 'ws';

import { DDPSDK } from '../src/DDPSDK';
import { fireStreamChange, fireStreamAdded, fireStreamRemove, handleConnection, handleSubscription, handleMethod } from './helpers';

(global as any).WebSocket = (global as any).WebSocket || WebSocket;

export let server: WS;

const callXTimes = <F extends (...args: any) => any>(fn: F, times: number): F => {
    return (async (...args) => {
        const methods = [].concat(...Array(times));
        // eslint-disable-next-line @typescript-eslint/no-unused-vars
        for (const _ of methods) {
            // eslint-disable-next-line no-await-in-loop
            await fn(...args);
        }
    }) as F;
};

beforeEach(async () => {
    server = new WS('ws://localhost:1234/websocket');
});

afterEach(() => {
    server.close();
    WS.clean();
    jest.useRealTimers();
});

it('should handle a stream of messages', async () => {
    const cb = jest.fn();

    const streamName = 'stream';
    const streamParams = '123';

    const sdk = DDPSDK.create('ws://localhost:1234');

    await handleConnection(server, sdk.connection.connect());

    const stream = sdk.stream(streamName, streamParams, cb);

    await handleSubscription(server, stream.id, streamName, streamParams);

    await stream.ready();

    fireStreamChange(server, streamName, streamParams);
    fireStreamChange(server, streamName, streamParams);
    fireStreamChange(server, streamName, streamParams);

    expect(cb).toBeCalledTimes(3);

    fireStreamChange(server, streamName, `${streamParams}-another`);

    expect(cb).toBeCalledTimes(3);

    expect(cb).toHaveBeenNthCalledWith(1, 1);
    sdk.connection.close();
});

it('should ignore messages other from changed', async () => {
    const cb = jest.fn();

    const streamName = 'stream';
    const streamParams = '123';

    const sdk = DDPSDK.create('ws://localhost:1234');

    await handleConnection(server, sdk.connection.connect());

    const stream = sdk.stream(streamName, streamParams, cb);

    await handleSubscription(server, stream.id, streamName, streamParams);

    await stream.ready();

    fireStreamAdded(server, streamName, streamParams);

    fireStreamRemove(server, streamName, streamParams);

    expect(cb).toBeCalledTimes(0);
    sdk.connection.close();
});

it('should handle streams after reconnect', async () => {
    const cb = jest.fn();

    const streamName = 'stream';
    const streamParams = '123';

    const sdk = DDPSDK.create('ws://localhost:1234');

    await handleConnection(server, sdk.connection.connect());

    const result = sdk.stream(streamName, streamParams, cb);

    expect(result.isReady).toBe(false);

    expect(sdk.client.subscriptions.size).toBe(1);

    await handleSubscription(server, result.id, streamName, streamParams);

    await result.ready();

    const fire = callXTimes(fireStreamChange, 3);

    await fire(server, streamName, streamParams);

    expect(cb).toBeCalledTimes(3);

    // Fake timers are used to avoid waiting for the reconnect timeout
    jest.useFakeTimers();
    server.close();
    WS.clean();

    server = new WS('ws://localhost:1234/websocket');

    const reconnect = new Promise((resolve) => sdk.connection.once('reconnecting', () => resolve(undefined)));
    const connecting = new Promise((resolve) => sdk.connection.once('connecting', () => resolve(undefined)));
    const connected = new Promise((resolve) => sdk.connection.once('connected', () => resolve(undefined)));
    await handleConnection(server, jest.advanceTimersByTimeAsync(1000), reconnect, connecting, connected);

    // the client should reconnect and resubscribe
    await Promise.all([handleSubscription(server, result.id, streamName, streamParams), jest.advanceTimersByTimeAsync(1000)]);

    fire(server, streamName, streamParams);
    await jest.advanceTimersByTimeAsync(1000);

    expect(cb).toBeCalledTimes(6);

    jest.useRealTimers();
    sdk.connection.close();
});

it('should handle an unsubscribe stream after reconnect', async () => {
    const cb = jest.fn();

    const streamName = 'stream';
    const streamParams = '123';

    const sdk = DDPSDK.create('ws://localhost:1234');

    await handleConnection(server, sdk.connection.connect());

    const subscription = sdk.stream(streamName, streamParams, cb);

    expect(subscription.isReady).toBe(false);

    expect(sdk.client.subscriptions.size).toBe(1);

    await handleSubscription(server, subscription.id, streamName, streamParams);

    await expect(subscription.ready()).resolves.toBe(undefined);

    expect(subscription.isReady).toBe(true);

    fireStreamChange(server, streamName, streamParams);
    fireStreamChange(server, streamName, streamParams);
    fireStreamChange(server, streamName, streamParams);

    expect(cb).toBeCalledTimes(3);

    // Fake timers are used to avoid waiting for the reconnect timeout
    jest.useFakeTimers();

    server.close();
    WS.clean();
    server = new WS('ws://localhost:1234/websocket');

    const reconnect = new Promise((resolve) => sdk.connection.once('reconnecting', () => resolve(undefined)));
    const connecting = new Promise((resolve) => sdk.connection.once('connecting', () => resolve(undefined)));
    const connected = new Promise((resolve) => sdk.connection.once('connected', () => resolve(undefined)));
    await handleConnection(server, jest.advanceTimersByTimeAsync(1000), reconnect, connecting, connected);

    await handleSubscription(server, subscription.id, streamName, streamParams);

    expect(subscription.isReady).toBe(true);

    fireStreamChange(server, streamName, streamParams);

    subscription.stop();

    expect(sdk.client.subscriptions.size).toBe(0);

    fireStreamChange(server, streamName, streamParams);
    fireStreamChange(server, streamName, streamParams);
    jest.advanceTimersByTimeAsync(1000);

    expect(cb).toBeCalledTimes(4);

    expect(sdk.client.subscriptions.size).toBe(0);
    jest.useRealTimers();
    sdk.connection.close();
    sdk.connection.close();
});

it('should create and connect to a stream', async () => {
    const promise = DDPSDK.createAndConnect('ws://localhost:1234');
    await handleConnection(server, promise);
    const sdk = await promise;
    sdk.connection.close();
});

it.skip('should try to loginWithToken after reconnection', async () => {
    const sdk = DDPSDK.create('ws://localhost:1234');

    await handleConnection(server, sdk.connection.connect());

    const messageResult = {
        id: 123,
        token: 'token1234',
        tokenExpires: { $date: 99999999 },
    };

    const { loginWithToken } = sdk.account;
    const loginFn = jest.fn((token: string) => loginWithToken.apply(sdk.account, [token]));
    sdk.account.loginWithToken = loginFn;

    await handleMethod(server, 'login', [{ resume: 'token' }], JSON.stringify(messageResult), sdk.account.loginWithToken('token'));

    expect(sdk.account.user?.token).toBe(messageResult.token);
    expect(loginFn).toHaveBeenCalledTimes(1);

    // Fake timers are used to avoid waiting for the reconnect timeout
    jest.useFakeTimers();

    server.close();
    WS.clean();
    server = new WS('ws://localhost:1234/websocket');

    const reconnect = new Promise((resolve) => sdk.connection.once('reconnecting', () => resolve(undefined)));
    const connecting = new Promise((resolve) => sdk.connection.once('connecting', () => resolve(undefined)));
    const connected = new Promise((resolve) => sdk.connection.once('connected', () => resolve(undefined)));
    await handleConnection(server, jest.advanceTimersByTimeAsync(1000), reconnect, connecting, connected);

    jest.advanceTimersByTimeAsync(1000);
    expect(loginFn).toHaveBeenCalledTimes(2);

    jest.useRealTimers();
    sdk.connection.close();
});

describe('Method call and Disconnection cases', () => {
    it('should handle properly if the message was sent after disconnection', async () => {
        const sdk = DDPSDK.create('ws://localhost:1234');

        await handleConnection(server, sdk.connection.connect());

        const [result] = await handleMethod(server, 'method', ['args1'], '1', sdk.client.callAsync('method', 'args1'));

        expect(result).toBe(1);
        // Fake timers are used to avoid waiting for the reconnect timeout
        jest.useFakeTimers();

        server.close();
        WS.clean();
        server = new WS('ws://localhost:1234/websocket');

        const reconnect = new Promise((resolve) => sdk.connection.once('reconnecting', () => resolve(undefined)));
        const connecting = new Promise((resolve) => sdk.connection.once('connecting', () => resolve(undefined)));
        const connected = new Promise((resolve) => sdk.connection.once('connected', () => resolve(undefined)));

        const callResult = sdk.client.callAsync('method', 'args2');

        expect(util.inspect(callResult).includes('pending')).toBe(true);

        await handleConnection(server, jest.advanceTimersByTimeAsync(1000), reconnect, connecting, connected);

        const [result2] = await handleMethod(server, 'method', ['args2'], '1', callResult);

        expect(util.inspect(callResult).includes('pending')).toBe(false);
        expect(result2).toBe(1);
        sdk.connection.close();
        jest.useRealTimers();
    });

    it.skip('should handle properly if the message was sent before disconnection but got disconnected before receiving the response', async () => {
        const sdk = DDPSDK.create('ws://localhost:1234');

        await handleConnection(server, sdk.connection.connect());

        const callResult = sdk.client.callAsync('method', 'args');

        expect(util.inspect(callResult).includes('pending')).toBe(true);

        // Fake timers are used to avoid waiting for the reconnect timeout
        jest.useFakeTimers();

        server.close();
        WS.clean();
        server = new WS('ws://localhost:1234/websocket');

        const reconnect = new Promise((resolve) => sdk.connection.once('reconnecting', () => resolve(undefined)));
        const connecting = new Promise((resolve) => sdk.connection.once('connecting', () => resolve(undefined)));
        const connected = new Promise((resolve) => sdk.connection.once('connected', () => resolve(undefined)));

        await handleConnection(server, jest.advanceTimersByTimeAsync(1000), reconnect, connecting, connected);

        expect(util.inspect(callResult).includes('pending')).toBe(true);

        const [result] = await handleMethod(server, 'method', ['args2'], '1', callResult);

        expect(result).toBe(1);

        jest.useRealTimers();
        sdk.connection.close();
    });
});