packages/rpc-provider/src/ws/index.ts
// Copyright 2017-2024 @polkadot/rpc-provider authors & contributors
// SPDX-License-Identifier: Apache-2.0
import type { Class } from '@polkadot/util/types';
import type { EndpointStats, JsonRpcResponse, ProviderInterface, ProviderInterfaceCallback, ProviderInterfaceEmitCb, ProviderInterfaceEmitted, ProviderStats } from '../types.js';
import { EventEmitter } from 'eventemitter3';
import { isChildClass, isNull, isUndefined, logger, noop, objectSpread, stringify } from '@polkadot/util';
import { xglobal } from '@polkadot/x-global';
import { WebSocket } from '@polkadot/x-ws';
import { RpcCoder } from '../coder/index.js';
import defaults from '../defaults.js';
import { DEFAULT_CAPACITY, LRUCache } from '../lru.js';
import { getWSErrorString } from './errors.js';
interface SubscriptionHandler {
callback: ProviderInterfaceCallback;
type: string;
}
interface WsStateAwaiting {
callback: ProviderInterfaceCallback;
method: string;
params: unknown[];
start: number;
subscription?: SubscriptionHandler | undefined;
}
interface WsStateSubscription extends SubscriptionHandler {
method: string;
params: unknown[];
}
const ALIASES: Record<string, string> = {
chain_finalisedHead: 'chain_finalizedHead',
chain_subscribeFinalisedHeads: 'chain_subscribeFinalizedHeads',
chain_unsubscribeFinalisedHeads: 'chain_unsubscribeFinalizedHeads'
};
const RETRY_DELAY = 2_500;
const DEFAULT_TIMEOUT_MS = 60 * 1000;
const TIMEOUT_INTERVAL = 5_000;
const l = logger('api-ws');
/** @internal Clears a Record<*> of all keys, optionally with all callback on clear */
function eraseRecord<T> (record: Record<string, T>, cb?: (item: T) => void): void {
Object.keys(record).forEach((key): void => {
if (cb) {
cb(record[key]);
}
delete record[key];
});
}
/** @internal Creates a default/empty stats object */
function defaultEndpointStats (): EndpointStats {
return { bytesRecv: 0, bytesSent: 0, cached: 0, errors: 0, requests: 0, subscriptions: 0, timeout: 0 };
}
/**
* # @polkadot/rpc-provider/ws
*
* @name WsProvider
*
* @description The WebSocket Provider allows sending requests using WebSocket to a WebSocket RPC server TCP port. Unlike the [[HttpProvider]], it does support subscriptions and allows listening to events such as new blocks or balance changes.
*
* @example
* <BR>
*
* ```javascript
* import Api from '@polkadot/api/promise';
* import { WsProvider } from '@polkadot/rpc-provider/ws';
*
* const provider = new WsProvider('ws://127.0.0.1:9944');
* const api = new Api(provider);
* ```
*
* @see [[HttpProvider]]
*/
export class WsProvider implements ProviderInterface {
readonly #callCache: LRUCache;
readonly #coder: RpcCoder;
readonly #endpoints: string[];
readonly #headers: Record<string, string>;
readonly #eventemitter: EventEmitter;
readonly #handlers: Record<string, WsStateAwaiting> = {};
readonly #isReadyPromise: Promise<WsProvider>;
readonly #stats: ProviderStats;
readonly #waitingForId: Record<string, JsonRpcResponse<unknown>> = {};
#autoConnectMs: number;
#endpointIndex: number;
#endpointStats: EndpointStats;
#isConnected = false;
#subscriptions: Record<string, WsStateSubscription> = {};
#timeoutId?: ReturnType<typeof setInterval> | null = null;
#websocket: WebSocket | null;
#timeout: number;
/**
* @param {string | string[]} endpoint The endpoint url. Usually `ws://ip:9944` or `wss://ip:9944`, may provide an array of endpoint strings.
* @param {number | false} autoConnectMs Whether to connect automatically or not (default). Provided value is used as a delay between retries.
* @param {Record<string, string>} headers The headers provided to the underlying WebSocket
* @param {number} [timeout] Custom timeout value used per request . Defaults to `DEFAULT_TIMEOUT_MS`
*/
constructor (endpoint: string | string[] = defaults.WS_URL, autoConnectMs: number | false = RETRY_DELAY, headers: Record<string, string> = {}, timeout?: number, cacheCapacity?: number) {
const endpoints = Array.isArray(endpoint)
? endpoint
: [endpoint];
if (endpoints.length === 0) {
throw new Error('WsProvider requires at least one Endpoint');
}
endpoints.forEach((endpoint) => {
if (!/^(wss|ws):\/\//.test(endpoint)) {
throw new Error(`Endpoint should start with 'ws://', received '${endpoint}'`);
}
});
this.#callCache = new LRUCache(cacheCapacity || DEFAULT_CAPACITY);
this.#eventemitter = new EventEmitter();
this.#autoConnectMs = autoConnectMs || 0;
this.#coder = new RpcCoder();
this.#endpointIndex = -1;
this.#endpoints = endpoints;
this.#headers = headers;
this.#websocket = null;
this.#stats = {
active: { requests: 0, subscriptions: 0 },
total: defaultEndpointStats()
};
this.#endpointStats = defaultEndpointStats();
this.#timeout = timeout || DEFAULT_TIMEOUT_MS;
if (autoConnectMs && autoConnectMs > 0) {
this.connectWithRetry().catch(noop);
}
this.#isReadyPromise = new Promise((resolve): void => {
this.#eventemitter.once('connected', (): void => {
resolve(this);
});
});
}
/**
* @summary `true` when this provider supports subscriptions
*/
public get hasSubscriptions (): boolean {
return !!true;
}
/**
* @summary `true` when this provider supports clone()
*/
public get isClonable (): boolean {
return !!true;
}
/**
* @summary Whether the node is connected or not.
* @return {boolean} true if connected
*/
public get isConnected (): boolean {
return this.#isConnected;
}
/**
* @description Promise that resolves the first time we are connected and loaded
*/
public get isReady (): Promise<WsProvider> {
return this.#isReadyPromise;
}
public get endpoint (): string {
return this.#endpoints[this.#endpointIndex];
}
/**
* @description Returns a clone of the object
*/
public clone (): WsProvider {
return new WsProvider(this.#endpoints);
}
protected selectEndpointIndex (endpoints: string[]): number {
return (this.#endpointIndex + 1) % endpoints.length;
}
/**
* @summary Manually connect
* @description The [[WsProvider]] connects automatically by default, however if you decided otherwise, you may
* connect manually using this method.
*/
// eslint-disable-next-line @typescript-eslint/require-await
public async connect (): Promise<void> {
if (this.#websocket) {
throw new Error('WebSocket is already connected');
}
try {
this.#endpointIndex = this.selectEndpointIndex(this.#endpoints);
// the as here is Deno-specific - not available on the globalThis
this.#websocket = typeof xglobal.WebSocket !== 'undefined' && isChildClass(xglobal.WebSocket as unknown as Class<WebSocket>, WebSocket)
? new WebSocket(this.endpoint)
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore - WS may be an instance of ws, which supports options
: new WebSocket(this.endpoint, undefined, {
headers: this.#headers
});
if (this.#websocket) {
this.#websocket.onclose = this.#onSocketClose;
this.#websocket.onerror = this.#onSocketError;
this.#websocket.onmessage = this.#onSocketMessage;
this.#websocket.onopen = this.#onSocketOpen;
}
// timeout any handlers that have not had a response
this.#timeoutId = setInterval(() => this.#timeoutHandlers(), TIMEOUT_INTERVAL);
} catch (error) {
l.error(error);
this.#emit('error', error);
throw error;
}
}
/**
* @description Connect, never throwing an error, but rather forcing a retry
*/
public async connectWithRetry (): Promise<void> {
if (this.#autoConnectMs > 0) {
try {
await this.connect();
} catch {
setTimeout((): void => {
this.connectWithRetry().catch(noop);
}, this.#autoConnectMs);
}
}
}
/**
* @description Manually disconnect from the connection, clearing auto-connect logic
*/
// eslint-disable-next-line @typescript-eslint/require-await
public async disconnect (): Promise<void> {
// switch off autoConnect, we are in manual mode now
this.#autoConnectMs = 0;
try {
if (this.#websocket) {
// 1000 - Normal closure; the connection successfully completed
this.#websocket.close(1000);
}
} catch (error) {
l.error(error);
this.#emit('error', error);
throw error;
}
}
/**
* @description Returns the connection stats
*/
public get stats (): ProviderStats {
return {
active: {
requests: Object.keys(this.#handlers).length,
subscriptions: Object.keys(this.#subscriptions).length
},
total: this.#stats.total
};
}
public get endpointStats (): EndpointStats {
return this.#endpointStats;
}
/**
* @summary Listens on events after having subscribed using the [[subscribe]] function.
* @param {ProviderInterfaceEmitted} type Event
* @param {ProviderInterfaceEmitCb} sub Callback
* @return unsubscribe function
*/
public on (type: ProviderInterfaceEmitted, sub: ProviderInterfaceEmitCb): () => void {
this.#eventemitter.on(type, sub);
return (): void => {
this.#eventemitter.removeListener(type, sub);
};
}
/**
* @summary Send JSON data using WebSockets to configured HTTP Endpoint or queue.
* @param method The RPC methods to execute
* @param params Encoded parameters as applicable for the method
* @param subscription Subscription details (internally used)
*/
public send <T = any> (method: string, params: unknown[], isCacheable?: boolean, subscription?: SubscriptionHandler): Promise<T> {
this.#endpointStats.requests++;
this.#stats.total.requests++;
const [id, body] = this.#coder.encodeJson(method, params);
const cacheKey = isCacheable ? `${method}::${stringify(params)}` : '';
let resultPromise: Promise<T> | null = isCacheable
? this.#callCache.get(cacheKey)
: null;
if (!resultPromise) {
resultPromise = this.#send(id, body, method, params, subscription);
if (isCacheable) {
this.#callCache.set(cacheKey, resultPromise);
}
} else {
this.#endpointStats.cached++;
this.#stats.total.cached++;
}
return resultPromise;
}
async #send <T> (id: number, body: string, method: string, params: unknown[], subscription?: SubscriptionHandler): Promise<T> {
return new Promise<T>((resolve, reject): void => {
try {
if (!this.isConnected || this.#websocket === null) {
throw new Error('WebSocket is not connected');
}
const callback = (error?: Error | null, result?: T): void => {
error
? reject(error)
: resolve(result as T);
};
l.debug(() => ['calling', method, body]);
this.#handlers[id] = {
callback,
method,
params,
start: Date.now(),
subscription
};
const bytesSent = body.length;
this.#endpointStats.bytesSent += bytesSent;
this.#stats.total.bytesSent += bytesSent;
this.#websocket.send(body);
} catch (error) {
this.#endpointStats.errors++;
this.#stats.total.errors++;
reject(error);
}
});
}
/**
* @name subscribe
* @summary Allows subscribing to a specific event.
*
* @example
* <BR>
*
* ```javascript
* const provider = new WsProvider('ws://127.0.0.1:9944');
* const rpc = new Rpc(provider);
*
* rpc.state.subscribeStorage([[storage.system.account, <Address>]], (_, values) => {
* console.log(values)
* }).then((subscriptionId) => {
* console.log('balance changes subscription id: ', subscriptionId)
* })
* ```
*/
public subscribe (type: string, method: string, params: unknown[], callback: ProviderInterfaceCallback): Promise<number | string> {
this.#endpointStats.subscriptions++;
this.#stats.total.subscriptions++;
// subscriptions are not cached, LRU applies to .at(<blockHash>) only
return this.send<number | string>(method, params, false, { callback, type });
}
/**
* @summary Allows unsubscribing to subscriptions made with [[subscribe]].
*/
public async unsubscribe (type: string, method: string, id: number | string): Promise<boolean> {
const subscription = `${type}::${id}`;
// FIXME This now could happen with re-subscriptions. The issue is that with a re-sub
// the assigned id now does not match what the API user originally received. It has
// a slight complication in solving - since we cannot rely on the send id, but rather
// need to find the actual subscription id to map it
if (isUndefined(this.#subscriptions[subscription])) {
l.debug(() => `Unable to find active subscription=${subscription}`);
return false;
}
delete this.#subscriptions[subscription];
try {
return this.isConnected && !isNull(this.#websocket)
? this.send<boolean>(method, [id])
: true;
} catch {
return false;
}
}
#emit = (type: ProviderInterfaceEmitted, ...args: unknown[]): void => {
this.#eventemitter.emit(type, ...args);
};
#onSocketClose = (event: CloseEvent): void => {
const error = new Error(`disconnected from ${this.endpoint}: ${event.code}:: ${event.reason || getWSErrorString(event.code)}`);
if (this.#autoConnectMs > 0) {
l.error(error.message);
}
this.#isConnected = false;
if (this.#websocket) {
this.#websocket.onclose = null;
this.#websocket.onerror = null;
this.#websocket.onmessage = null;
this.#websocket.onopen = null;
this.#websocket = null;
}
if (this.#timeoutId) {
clearInterval(this.#timeoutId);
this.#timeoutId = null;
}
// reject all hanging requests
eraseRecord(this.#handlers, (h) => {
try {
h.callback(error, undefined);
} catch (err) {
// does not throw
l.error(err);
}
});
eraseRecord(this.#waitingForId);
// Reset stats for active endpoint
this.#endpointStats = defaultEndpointStats();
this.#emit('disconnected');
if (this.#autoConnectMs > 0) {
setTimeout((): void => {
this.connectWithRetry().catch(noop);
}, this.#autoConnectMs);
}
};
#onSocketError = (error: Event): void => {
l.debug(() => ['socket error', error]);
this.#emit('error', error);
};
#onSocketMessage = (message: MessageEvent<string>): void => {
l.debug(() => ['received', message.data]);
const bytesRecv = message.data.length;
this.#endpointStats.bytesRecv += bytesRecv;
this.#stats.total.bytesRecv += bytesRecv;
const response = JSON.parse(message.data) as JsonRpcResponse<string>;
return isUndefined(response.method)
? this.#onSocketMessageResult(response)
: this.#onSocketMessageSubscribe(response);
};
#onSocketMessageResult = (response: JsonRpcResponse<string>): void => {
const handler = this.#handlers[response.id];
if (!handler) {
l.debug(() => `Unable to find handler for id=${response.id}`);
return;
}
try {
const { method, params, subscription } = handler;
const result = this.#coder.decodeResponse<string>(response);
// first send the result - in case of subs, we may have an update
// immediately if we have some queued results already
handler.callback(null, result);
if (subscription) {
const subId = `${subscription.type}::${result}`;
this.#subscriptions[subId] = objectSpread({}, subscription, {
method,
params
});
// if we have a result waiting for this subscription already
if (this.#waitingForId[subId]) {
this.#onSocketMessageSubscribe(this.#waitingForId[subId]);
}
}
} catch (error) {
this.#endpointStats.errors++;
this.#stats.total.errors++;
handler.callback(error as Error, undefined);
}
delete this.#handlers[response.id];
};
#onSocketMessageSubscribe = (response: JsonRpcResponse<unknown>): void => {
if (!response.method) {
throw new Error('No method found in JSONRPC response');
}
const method = ALIASES[response.method] || response.method;
const subId = `${method}::${response.params.subscription}`;
const handler = this.#subscriptions[subId];
if (!handler) {
// store the JSON, we could have out-of-order subid coming in
this.#waitingForId[subId] = response;
l.debug(() => `Unable to find handler for subscription=${subId}`);
return;
}
// housekeeping
delete this.#waitingForId[subId];
try {
const result = this.#coder.decodeResponse(response);
handler.callback(null, result);
} catch (error) {
this.#endpointStats.errors++;
this.#stats.total.errors++;
handler.callback(error as Error, undefined);
}
};
#onSocketOpen = (): boolean => {
if (this.#websocket === null) {
throw new Error('WebSocket cannot be null in onOpen');
}
l.debug(() => ['connected to', this.endpoint]);
this.#isConnected = true;
this.#resubscribe();
this.#emit('connected');
return true;
};
#resubscribe = (): void => {
const subscriptions = this.#subscriptions;
this.#subscriptions = {};
Promise.all(Object.keys(subscriptions).map(async (id): Promise<void> => {
const { callback, method, params, type } = subscriptions[id];
// only re-create subscriptions which are not in author (only area where
// transactions are created, i.e. submissions such as 'author_submitAndWatchExtrinsic'
// are not included (and will not be re-broadcast)
if (type.startsWith('author_')) {
return;
}
try {
await this.subscribe(type, method, params, callback);
} catch (error) {
l.error(error);
}
})).catch(l.error);
};
#timeoutHandlers = (): void => {
const now = Date.now();
const ids = Object.keys(this.#handlers);
for (let i = 0, count = ids.length; i < count; i++) {
const handler = this.#handlers[ids[i]];
if ((now - handler.start) > this.#timeout) {
try {
handler.callback(new Error(`No response received from RPC endpoint in ${this.#timeout / 1000}s`), undefined);
} catch {
// ignore
}
this.#endpointStats.timeout++;
this.#stats.total.timeout++;
delete this.#handlers[ids[i]];
}
}
};
}