polkadot-js/api

View on GitHub
packages/rpc-provider/src/substrate-connect/Health.ts

Summary

Maintainability
A
0 mins
Test Coverage
// Copyright 2017-2024 @polkadot/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0

import type { HealthChecker, SmoldotHealth } from './types.js';

import { stringify } from '@polkadot/util';

interface JSONRequest {
  id: string;
  jsonrpc: '2.0',
  method: string;
  params: unknown[];
}

/*
 * Creates a new health checker.
 *
 * The role of the health checker is to report to the user the health of a smoldot chain.
 *
 * In order to use it, start by creating a health checker, and call `setSendJsonRpc` to set the
 * way to send a JSON-RPC request to a chain. The health checker is disabled by default. Use
 * `start()` in order to start the health checks. The `start()` function must be passed a callback called
 * when an update to the health of the node is available.
 *
 * In order to send a JSON-RPC request to the chain, you **must** use the `sendJsonRpc` function
 * of the health checker. The health checker rewrites the `id` of the requests it receives.
 *
 * When the chain send a JSON-RPC response, it must be passed to `responsePassThrough()`. This
 * function intercepts the responses destined to the requests that have been emitted by the health
 * checker and returns `null`. If the response doesn't concern the health checker, the response is
 * simply returned by the function.
 *
 * # How it works
 *
 * The health checker periodically calls the `system_health` JSON-RPC call in order to determine
 * the health of the chain.
 *
 * In addition to this, as long as the health check reports that `isSyncing` is `true`, the
 * health checker also maintains a subscription to new best blocks using `chain_subscribeNewHeads`.
 * Whenever a new block is notified, a health check is performed immediately in order to determine
 * whether `isSyncing` has changed to `false`.
 *
 * Thanks to this subscription, the latency of the report of the switch from `isSyncing: true` to
 * `isSyncing: false` is very low.
 *
 */
export function healthChecker (): HealthChecker {
  // `null` if health checker is not started.
  let checker: null | InnerChecker = null;
  let sendJsonRpc: null | ((request: string) => void) = null;

  return {
    responsePassThrough: (jsonRpcResponse) => {
      if (checker === null) {
        return jsonRpcResponse;
      }

      return checker.responsePassThrough(jsonRpcResponse);
    },
    sendJsonRpc: (request) => {
      if (!sendJsonRpc) {
        throw new Error('setSendJsonRpc must be called before sending requests');
      }

      if (checker === null) {
        sendJsonRpc(request);
      } else {
        checker.sendJsonRpc(request);
      }
    },
    setSendJsonRpc: (cb) => {
      sendJsonRpc = cb;
    },
    start: (healthCallback) => {
      if (checker !== null) {
        throw new Error("Can't start the health checker multiple times in parallel");
      } else if (!sendJsonRpc) {
        throw new Error('setSendJsonRpc must be called before starting the health checks');
      }

      checker = new InnerChecker(healthCallback, sendJsonRpc);
      checker.update(true);
    },
    stop: () => {
      if (checker === null) {
        return;
      } // Already stopped.

      checker.destroy();
      checker = null;
    }
  };
}

class InnerChecker {
  #healthCallback: (health: SmoldotHealth) => void;
  #currentHealthCheckId: string | null = null;
  #currentHealthTimeout: ReturnType<typeof setTimeout> | null = null;
  #currentSubunsubRequestId: string | null = null;
  #currentSubscriptionId: string | null = null;
  #requestToSmoldot: (request: JSONRequest) => void;
  #isSyncing = false;
  #nextRequestId = 0;

  constructor (healthCallback: (health: SmoldotHealth) => void, requestToSmoldot: (request: string) => void) {
    this.#healthCallback = healthCallback;
    this.#requestToSmoldot = (request: JSONRequest) => requestToSmoldot(stringify(request));
  }

  sendJsonRpc = (request: string): void => {
    // Replace the `id` in the request to prefix the request ID with `extern:`.
    let parsedRequest: JSONRequest;

    try {
      parsedRequest = JSON.parse(request) as JSONRequest;
    } catch {
      return;
    }

    if (parsedRequest.id) {
      const newId = 'extern:' + stringify(parsedRequest.id);

      parsedRequest.id = newId;
    }

    this.#requestToSmoldot(parsedRequest);
  };

  responsePassThrough = (jsonRpcResponse: string): string | null => {
    let parsedResponse: {id: string, result?: SmoldotHealth, params?: { subscription: string }};

    try {
      parsedResponse = JSON.parse(jsonRpcResponse) as { id: string, result?: SmoldotHealth };
    } catch {
      return jsonRpcResponse;
    }

    // Check whether response is a response to `system_health`.
    if (parsedResponse.id && this.#currentHealthCheckId === parsedResponse.id) {
      this.#currentHealthCheckId = null;

      // Check whether query was successful. It is possible for queries to fail for
      // various reasons, such as the client being overloaded.
      if (!parsedResponse.result) {
        this.update(false);

        return null;
      }

      this.#healthCallback(parsedResponse.result);
      this.#isSyncing = parsedResponse.result.isSyncing;
      this.update(false);

      return null;
    }

    // Check whether response is a response to the subscription or unsubscription.
    if (
      parsedResponse.id &&
      this.#currentSubunsubRequestId === parsedResponse.id
    ) {
      this.#currentSubunsubRequestId = null;

      // Check whether query was successful. It is possible for queries to fail for
      // various reasons, such as the client being overloaded.
      if (!parsedResponse.result) {
        this.update(false);

        return null;
      }

      if (this.#currentSubscriptionId) {
        this.#currentSubscriptionId = null;
      } else {
        this.#currentSubscriptionId = parsedResponse.result as unknown as string;
      }

      this.update(false);

      return null;
    }

    // Check whether response is a notification to a subscription.
    if (
      parsedResponse.params &&
      this.#currentSubscriptionId &&
      parsedResponse.params.subscription === this.#currentSubscriptionId
    ) {
      // Note that after a successful subscription, a notification containing
      // the current best block is always returned. Considering that a
      // subscription is performed in response to a health check, calling
      // `startHealthCheck()` here will lead to a second health check.
      // It might seem redundant to perform two health checks in a quick
      // succession, but doing so doesn't lead to any problem, and it is
      // actually possible for the health to have changed in between as the
      // current best block might have been updated during the subscription
      // request.
      this.update(true);

      return null;
    }

    // Response doesn't concern us.
    if (parsedResponse.id) {
      const id: string = parsedResponse.id;

      // Need to remove the `extern:` prefix.
      if (!id.startsWith('extern:')) {
        throw new Error('State inconsistency in health checker');
      }

      const newId = JSON.parse(id.slice('extern:'.length)) as string;

      parsedResponse.id = newId;
    }

    return stringify(parsedResponse);
  };

  update = (startNow: boolean): void => {
    // If `startNow`, clear `#currentHealthTimeout` so that it is set below.
    if (startNow && this.#currentHealthTimeout) {
      clearTimeout(this.#currentHealthTimeout);
      this.#currentHealthTimeout = null;
    }

    if (!this.#currentHealthTimeout) {
      const startHealthRequest = () => {
        this.#currentHealthTimeout = null;

        // No matter what, don't start a health request if there is already one in progress.
        // This is sane to do because receiving a response to a health request calls `update()`.
        if (this.#currentHealthCheckId) {
          return;
        }

        // Actual request starting.
        this.#currentHealthCheckId = `health-checker:${this.#nextRequestId}`;
        this.#nextRequestId += 1;

        this.#requestToSmoldot({
          id: this.#currentHealthCheckId,
          jsonrpc: '2.0',
          method: 'system_health',
          params: []
        });
      };

      if (startNow) {
        startHealthRequest();
      } else {
        this.#currentHealthTimeout = setTimeout(startHealthRequest, 1000);
      }
    }

    if (
      this.#isSyncing &&
      !this.#currentSubscriptionId &&
      !this.#currentSubunsubRequestId
    ) {
      this.startSubscription();
    }

    if (
      !this.#isSyncing &&
      this.#currentSubscriptionId &&
      !this.#currentSubunsubRequestId
    ) {
      this.endSubscription();
    }
  };

  startSubscription = (): void => {
    if (this.#currentSubunsubRequestId || this.#currentSubscriptionId) {
      throw new Error('Internal error in health checker');
    }

    this.#currentSubunsubRequestId = `health-checker:${this.#nextRequestId}`;
    this.#nextRequestId += 1;

    this.#requestToSmoldot({
      id: this.#currentSubunsubRequestId,
      jsonrpc: '2.0',
      method: 'chain_subscribeNewHeads',
      params: []
    });
  };

  endSubscription = (): void => {
    if (this.#currentSubunsubRequestId || !this.#currentSubscriptionId) {
      throw new Error('Internal error in health checker');
    }

    this.#currentSubunsubRequestId = `health-checker:${this.#nextRequestId}`;
    this.#nextRequestId += 1;

    this.#requestToSmoldot({
      id: this.#currentSubunsubRequestId,
      jsonrpc: '2.0',
      method: 'chain_unsubscribeNewHeads',
      params: [this.#currentSubscriptionId]
    });
  };

  destroy = (): void => {
    if (this.#currentHealthTimeout) {
      clearTimeout(this.#currentHealthTimeout);
      this.#currentHealthTimeout = null;
    }
  };
}

export class HealthCheckError extends Error {
  readonly #cause: unknown;

  getCause (): unknown {
    return this.#cause;
  }

  constructor (response: unknown, message = 'Got error response asking for system health') {
    super(message);

    this.#cause = response;
  }
}