polkadot-js/api

View on GitHub
packages/rpc-provider/src/substrate-connect/index.spec.ts

Summary

Maintainability
D
1 day
Test Coverage
// Copyright 2017-2024 @polkadot/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0

/// <reference types="@polkadot/dev-test/globals.d.ts" />

import type * as Sc from '@substrate/connect';
import type { HealthChecker, SmoldotHealth } from './types.js';

import { noop, stringify } from '@polkadot/util';

import { ScProvider } from './index.js';

interface MockChain extends Sc.Chain {
  _spec: () => string;
  _recevedRequests: () => string[];
  _isTerminated: () => boolean;
  _triggerCallback: (response: string | object) => void;
  _setTerminateInterceptor: (fn: () => void) => void;
  _setSendJsonRpcInterceptor: (fn: (rpc: string) => void) => void;
  _getLatestRequest: () => string;
}

interface MockedHealthChecker extends HealthChecker {
  _isActive: () => boolean;
  _triggerHealthUpdate: (update: SmoldotHealth) => void;
}

type MockSc = typeof Sc & {
  latestChain: () => MockChain;
};

enum WellKnownChain {
  polkadot = 'polkadot',
  ksmcc3 = 'ksmcc3',
  rococo_v2_2 = 'rococo_v2_2',
  westend2 = 'westend2'
}

const wait = (ms: number) =>
  new Promise((resolve) =>
    setTimeout(resolve, ms)
  );

function healthCheckerMock (): MockedHealthChecker {
  let cb: (health: SmoldotHealth) => void = () => undefined;
  let sendJsonRpc: (request: string) => void = () => undefined;
  let isActive = false;

  return {
    _isActive: () => isActive,
    _triggerHealthUpdate: (update: SmoldotHealth) => {
      cb(update);
    },
    responsePassThrough: (response) => response,
    sendJsonRpc: (...args) => sendJsonRpc(...args),
    setSendJsonRpc: (cb) => {
      sendJsonRpc = cb;
    },
    start: (x) => {
      isActive = true;
      cb = x;
    },
    stop: () => {
      isActive = false;
    }
  };
}

function healthCheckerFactory () {
  const _healthCheckers: MockedHealthChecker[] = [];

  return {
    _healthCheckers,
    _latestHealthChecker: () => _healthCheckers.slice(-1)[0],
    healthChecker: () => {
      const result = healthCheckerMock();

      _healthCheckers.push(result);

      return result;
    }
  };
}

function getFakeChain (spec: string, callback: Sc.JsonRpcCallback): MockChain {
  const _receivedRequests: string[] = [];
  let _isTerminated = false;

  let terminateInterceptor = Function.prototype;
  let sendJsonRpcInterceptor = Function.prototype;

  return {
    _getLatestRequest: () => _receivedRequests[_receivedRequests.length - 1],
    _isTerminated: () => _isTerminated,
    _recevedRequests: () => _receivedRequests,
    _setSendJsonRpcInterceptor: (fn) => {
      sendJsonRpcInterceptor = fn;
    },
    _setTerminateInterceptor: (fn) => {
      terminateInterceptor = fn;
    },
    _spec: () => spec,
    _triggerCallback: (response) => {
      callback(
        typeof response === 'string'
          ? response
          : stringify(response)
      );
    },
    addChain: (chainSpec, jsonRpcCallback) =>
      Promise.resolve(getFakeChain(chainSpec, jsonRpcCallback ?? noop)),
    remove: () => {
      terminateInterceptor();
      _isTerminated = true;
    },
    sendJsonRpc: (rpc) => {
      sendJsonRpcInterceptor(rpc);
      _receivedRequests.push(rpc);
    }
  };
}

function getFakeClient () {
  const chains: MockChain[] = [];
  let addChainInterceptor: Promise<void> = Promise.resolve();
  let addWellKnownChainInterceptor: Promise<void> = Promise.resolve();

  return {
    _chains: () => chains,
    _setAddChainInterceptor: (interceptor: Promise<void>) => {
      addChainInterceptor = interceptor;
    },
    _setAddWellKnownChainInterceptor: (interceptor: Promise<void>) => {
      addWellKnownChainInterceptor = interceptor;
    },
    addChain: (chainSpec: string, cb: Sc.JsonRpcCallback): Promise<MockChain> =>
      addChainInterceptor.then(() => {
        const result = getFakeChain(chainSpec, cb);

        chains.push(result);

        return result;
      }),
    addWellKnownChain: (
      wellKnownChain: string,
      cb: Sc.JsonRpcCallback
    ): Promise<MockChain> =>
      addWellKnownChainInterceptor.then(() => {
        const result = getFakeChain(wellKnownChain, cb);

        chains.push(result);

        return result;
      })
  };
}

function connectorFactory (): MockSc {
  const clients: ReturnType<typeof getFakeClient>[] = [];
  const latestClient = () => clients[clients.length - 1];

  return {
    WellKnownChain,
    _clients: () => clients,
    createScClient: () => {
      const result = getFakeClient();

      clients.push(result);

      return result;
    },
    latestChain: () =>
      latestClient()._chains()[latestClient()._chains().length - 1],
    latestClient
  } as unknown as MockSc;
}

function setChainSyncyingStatus (isSyncing: boolean): void {
  getCurrentHealthChecker()._triggerHealthUpdate({
    isSyncing,
    peers: 1,
    shouldHavePeers: true
  });
}

let mockSc: MockSc;
let mockedHealthChecker: ReturnType<typeof healthCheckerFactory>;
const getCurrentHealthChecker = () => mockedHealthChecker._latestHealthChecker();

describe('ScProvider', () => {
  beforeAll(() => {
    mockSc = connectorFactory();
    mockedHealthChecker = healthCheckerFactory();
  });

  describe('on', () => {
    it('emits `connected` as soon as the chain is not syncing', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);

      const onConnected = jest.fn();

      provider.on('connected', onConnected);

      expect(onConnected).not.toHaveBeenCalled();
      setChainSyncyingStatus(false);
      expect(onConnected).toHaveBeenCalled();
    });

    it('stops receiving notifications after unsubscribing', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);

      const onConnected = jest.fn();

      provider.on('connected', onConnected)();
      expect(onConnected).not.toHaveBeenCalled();

      setChainSyncyingStatus(false);
      expect(onConnected).not.toHaveBeenCalled();
    });

    it('synchronously emits connected if the Provider is already `connected`', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);

      setChainSyncyingStatus(false);

      const onConnected = jest.fn();

      provider.on('connected', onConnected);
      expect(onConnected).toHaveBeenCalled();
    });

    it('emits `disconnected` once the chain goes back to syncing', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);

      setChainSyncyingStatus(false);

      const onConnected = jest.fn();
      const onDisconnected = jest.fn();

      provider.on('connected', onConnected);
      provider.on('disconnected', onDisconnected);

      expect(onConnected).toHaveBeenCalled();
      expect(onDisconnected).not.toHaveBeenCalled();

      onConnected.mockReset();
      setChainSyncyingStatus(true);

      expect(onConnected).not.toHaveBeenCalled();
      expect(onDisconnected).toHaveBeenCalled();
    });
  });

  describe('hasSubscriptions', () => {
    it('supports subscriptions', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);

      expect(provider.hasSubscriptions).toBe(true);
    });
  });

  describe('clone', () => {
    it('can not be clonned', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);

      expect(() => provider.clone()).toThrow();
    });
  });

  describe('connect', () => {
    it('does not create a new chain when trying to re-connect while the current chain is syncing', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);
      const chain = mockSc.latestChain();

      await provider.connect(undefined, mockedHealthChecker.healthChecker);
      expect(chain).toBe(mockSc.latestChain());
    });

    it('throws when trying to connect on an already connected Provider', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);

      setChainSyncyingStatus(false);

      await expect(
        provider.connect(undefined, mockedHealthChecker.healthChecker)
      ).rejects.toThrow(/Already connected/);
    });
  });

  describe('disconnect', () => {
    it('removes the chain and cleans up', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);
      const chain = mockSc.latestChain();

      await provider.disconnect();

      expect(chain._isTerminated()).toBe(true);
    });

    // eslint-disable-next-line jest/expect-expect
    it('does not throw when disconnecting on an already disconnected Provider', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);
      await provider.disconnect();
      await provider.disconnect();
    });
  });

  describe('send', () => {
    it('throws when trying to send a request while the Provider is not connected', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);

      await expect(provider.send('', [])).rejects.toThrow();
    });

    it('receives responses to its requests', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);
      const chain = mockSc.latestChain();

      setChainSyncyingStatus(false);

      const responsePromise = provider.send<unknown>('getData', ['foo']);

      await wait(0);
      expect(chain._getLatestRequest()).toEqual(
        '{"id":1,"jsonrpc":"2.0","method":"getData","params":["foo"]}'
      );

      const result = { foo: 'foo' };

      chain._triggerCallback({
        id: 1,
        jsonrpc: '2.0',
        result
      });

      const response = await responsePromise;

      expect(response).toEqual(result);
    });

    it("rejects when the response can't be deserialized", async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);
      const chain = mockSc.latestChain();

      setChainSyncyingStatus(false);

      setTimeout(() => {
        chain._triggerCallback({
          id: 1,
          jsonrpc: '2.0'
        });
      }, 0);

      await expect(provider.send('getData', ['foo'])).rejects.toThrow();
    });

    it('rejects when the smoldot chain has crashed', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);
      const chain = mockSc.latestChain();

      setChainSyncyingStatus(false);
      await wait(0);

      chain._setSendJsonRpcInterceptor(() => {
        throw new Error('boom!');
      });

      await expect(
        provider.send('getData', ['foo'])
      ).rejects.toThrow(/Disconnected/);
      expect(provider.isConnected).toBe(false);
    });
  });

  describe('subscribe', () => {
    it('subscribes and recives messages until it unsubscribes', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);
      const chain = mockSc.latestChain();

      setChainSyncyingStatus(false);

      const unsubscribeToken = 'unsubscribeToken';

      setTimeout(() => {
        chain._triggerCallback({
          id: 1,
          jsonrpc: '2.0',
          result: unsubscribeToken
        });
      }, 0);

      const cb = jest.fn();
      const token = await provider.subscribe(
        'foo',
        'chain_subscribeNewHeads',
        ['baz'],
        cb
      );

      expect(token).toBe(unsubscribeToken);
      expect(cb).not.toHaveBeenCalled();

      chain._triggerCallback({
        jsonrpc: '2.0',
        method: 'foo',
        params: {
          result: 1,
          subscription: token
        }
      });
      expect(cb).toHaveBeenCalledTimes(1);
      expect(cb).toHaveBeenLastCalledWith(null, 1);

      chain._triggerCallback({
        jsonrpc: '2.0',
        method: 'foo',
        params: {
          result: 2,
          subscription: token
        }
      });
      expect(cb).toHaveBeenCalledTimes(2);
      expect(cb).toHaveBeenLastCalledWith(null, 2);

      provider
        .unsubscribe('foo', 'chain_unsubscribeNewHeads', unsubscribeToken)
        .catch(console.error);

      chain._triggerCallback({
        jsonrpc: '2.0',
        method: 'foo',
        params: {
          result: 3,
          subscription: token
        }
      });
      expect(cb).toHaveBeenCalledTimes(2);
      expect(cb).toHaveBeenLastCalledWith(null, 2);
    });

    it('ignores subscription messages that were received before the subscription token', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);
      const chain = mockSc.latestChain();

      setChainSyncyingStatus(false);

      const unsubscribeToken = 'unsubscribeToken';

      chain._triggerCallback({
        jsonrpc: '2.0',
        method: 'foo',
        params: {
          result: 1,
          subscription: unsubscribeToken
        }
      });
      setTimeout(() => {
        chain._triggerCallback({
          id: 1,
          jsonrpc: '2.0',
          result: unsubscribeToken
        });
      }, 0);

      const cb = jest.fn();
      const token = await provider.subscribe(
        'foo',
        'chain_subscribeNewHeads',
        ['baz'],
        cb
      );

      expect(token).toBe(unsubscribeToken);
      expect(cb).not.toHaveBeenCalled();
    });

    it('emits the error when the message has an error', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);
      const chain = mockSc.latestChain();

      setChainSyncyingStatus(false);
      await wait(0);

      const unsubscribeToken = 'unsubscribeToken';

      setTimeout(() => {
        chain._triggerCallback({
          id: 1,
          jsonrpc: '2.0',
          result: unsubscribeToken
        });
      }, 0);

      const cb = jest.fn();
      const token = await provider.subscribe(
        'foo',
        'chain_subscribeNewHeads',
        ['baz'],
        cb
      );

      chain._triggerCallback({
        jsonrpc: '2.0',
        method: 'foo',
        params: {
          error: 'boom',
          subscription: unsubscribeToken
        }
      });

      expect(token).toBe(unsubscribeToken);
      expect(cb).toHaveBeenCalledTimes(1);
      expect(cb).toHaveBeenLastCalledWith(expect.any(Error), undefined);
    });

    it('errors when subscribing to an unsupported method', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);

      setChainSyncyingStatus(false);

      await wait(0);
      await expect(
        provider.subscribe('foo', 'bar', ['baz'], () => undefined)
      ).rejects.toThrow(/Unsupported subscribe method: bar/);
    });
  });

  describe('unsubscribe', () => {
    it('rejects when trying to unsubscribe from un unexisting subscription', async () => {
      const provider = new ScProvider(mockSc, '');

      await provider.connect(undefined, mockedHealthChecker.healthChecker);

      setChainSyncyingStatus(false);

      await expect(
        provider.unsubscribe('', '', '')
      ).rejects.toThrow(/Unable to find active subscription/);
    });
  });

  it('cleans up the stale subscriptions once it reconnects', async () => {
    const provider = new ScProvider(mockSc, '');

    await provider.connect(undefined, mockedHealthChecker.healthChecker);
    const chain = mockSc.latestChain();

    // setting the syncing status of the chain to fals so that the Provider
    // gets `connected`
    setChainSyncyingStatus(false);

    // while connected we create a subscription
    const unsubscribeToken = 'unsubscribeToken';

    setTimeout(() => {
      chain._triggerCallback({
        id: 1,
        jsonrpc: '2.0',
        result: unsubscribeToken
      });
    }, 0);

    const cb = jest.fn();
    const token = await provider.subscribe(
      'foo',
      'chain_subscribeNewHeads',
      ['baz'],
      cb
    );

    // setting the syncing status of the chain to fals so that the Provider
    // gets `disconnected`
    setChainSyncyingStatus(true);

    // let's wait some time in order to ensure that the stale unsubscription
    // messages are not sent until the chain syncing status changes back to false
    await wait(200);

    // before we let the healthChecker know that the chain is no longer syncing,
    // let's make sure that the chain has received the correct request, and
    // most importantly that it has not received a request for unsubscribing
    // from the stale subscription, since that request should happen once the
    // chain is no longer syncing
    expect(chain._recevedRequests()).toEqual([
      '{"id":1,"jsonrpc":"2.0","method":"chain_subscribeNewHeads","params":["baz"]}'
    ]);

    // lets change the sync status back to false
    setChainSyncyingStatus(false);

    // let's wait one tick to ensure that the microtasks got processed
    await wait(0);

    // let's make sure that we have now sent the request for killing the
    // stale subscription
    expect(chain._recevedRequests()).toEqual([
      '{"id":1,"jsonrpc":"2.0","method":"chain_subscribeNewHeads","params":["baz"]}',
      `{"id":2,"jsonrpc":"2.0","method":"chain_unsubscribeNewHeads","params":["${token}"]}`,
      '{"id":3,"jsonrpc":"2.0","method":"chain_subscribeNewHeads","params":["baz"]}'
    ]);
  });
});