packages/rpc-core/src/bundle.ts
// Copyright 2017-2024 @polkadot/rpc-core authors & contributors
// SPDX-License-Identifier: Apache-2.0
import type { Observer } from 'rxjs';
import type { ProviderInterface, ProviderInterfaceCallback } from '@polkadot/rpc-provider/types';
import type { StorageKey, Vec } from '@polkadot/types';
import type { Hash } from '@polkadot/types/interfaces';
import type { AnyJson, AnyNumber, Codec, DefinitionRpc, DefinitionRpcExt, DefinitionRpcSub, Registry } from '@polkadot/types/types';
import type { Memoized } from '@polkadot/util/types';
import type { RpcCoreStats, RpcInterfaceMethod } from './types/index.js';
import { Observable, publishReplay, refCount } from 'rxjs';
import { rpcDefinitions } from '@polkadot/types';
import { hexToU8a, isFunction, isNull, isUndefined, lazyMethod, logger, memoize, objectSpread, u8aConcat, u8aToU8a } from '@polkadot/util';
import { drr, refCountDelay } from './util/index.js';
export { packageInfo } from './packageInfo.js';
export * from './util/index.js';
interface StorageChangeSetJSON {
block: string;
changes: [string, string | null][];
}
type MemoizedRpcInterfaceMethod = Memoized<RpcInterfaceMethod> & {
raw: Memoized<RpcInterfaceMethod>;
meta: DefinitionRpc;
}
interface Options {
isPedantic?: boolean;
provider: ProviderInterface;
userRpc?: Record<string, Record<string, DefinitionRpc | DefinitionRpcSub>>;
}
const l = logger('rpc-core');
const EMPTY_META = {
fallback: undefined,
modifier: { isOptional: true },
type: {
asMap: { linked: { isTrue: false } },
isMap: false
}
};
// utility method to create a nicely-formatted error
/** @internal */
function logErrorMessage (method: string, { noErrorLog, params, type }: DefinitionRpc, error: Error): void {
if (noErrorLog) {
return;
}
l.error(`${method}(${
params.map(({ isOptional, name, type }): string =>
`${name}${isOptional ? '?' : ''}: ${type}`
).join(', ')
}): ${type}:: ${error.message}`);
}
function isTreatAsHex (key: StorageKey): boolean {
// :code is problematic - it does not have the length attached, which is
// unlike all other storage entries where it is indeed properly encoded
return ['0x3a636f6465'].includes(key.toHex());
}
/**
* @name Rpc
* @summary The API may use a HTTP or WebSockets provider.
* @description It allows for querying a Polkadot Client Node.
* WebSockets provider is recommended since HTTP provider only supports basic querying.
*
* ```mermaid
* graph LR;
* A[Api] --> |WebSockets| B[WsProvider];
* B --> |endpoint| C[ws://127.0.0.1:9944]
* ```
*
* @example
* <BR>
*
* ```javascript
* import Rpc from '@polkadot/rpc-core';
* import { WsProvider } from '@polkadot/rpc-provider/ws';
*
* const provider = new WsProvider('ws://127.0.0.1:9944');
* const rpc = new Rpc(provider);
* ```
*/
export class RpcCore {
readonly #instanceId: string;
readonly #isPedantic: boolean;
readonly #registryDefault: Registry;
readonly #storageCache = new Map<string, Codec>();
#storageCacheHits = 0;
#storageCacheSize = 0;
#getBlockRegistry?: (blockHash: Uint8Array) => Promise<{ registry: Registry }>;
#getBlockHash?: (blockNumber: AnyNumber) => Promise<Uint8Array>;
readonly mapping = new Map<string, DefinitionRpcExt>();
readonly provider: ProviderInterface;
readonly sections: string[] = [];
/**
* @constructor
* Default constructor for the core RPC handler
* @param {ProviderInterface} provider An API provider using any of the supported providers (HTTP, SC or WebSocket)
*/
constructor (instanceId: string, registry: Registry, { isPedantic = true, provider, userRpc = {} }: Options) {
if (!provider || !isFunction(provider.send)) {
throw new Error('Expected Provider to API create');
}
this.#instanceId = instanceId;
this.#isPedantic = isPedantic;
this.#registryDefault = registry;
this.provider = provider;
const sectionNames = Object.keys(rpcDefinitions);
// these are the base keys (i.e. part of jsonrpc)
this.sections.push(...sectionNames);
// decorate all interfaces, defined and user on this instance
this.addUserInterfaces(userRpc);
}
/**
* @description Returns the connected status of a provider
*/
public get isConnected (): boolean {
return this.provider.isConnected;
}
/**
* @description Manually connect from the attached provider
*/
public connect (): Promise<void> {
return this.provider.connect();
}
/**
* @description Manually disconnect from the attached provider
*/
public disconnect (): Promise<void> {
return this.provider.disconnect();
}
/**
* @description Returns the underlying core stats, including those from teh provider
*/
public get stats (): RpcCoreStats | undefined {
const stats = this.provider.stats;
return stats
? {
...stats,
core: {
cacheHits: this.#storageCacheHits,
cacheSize: this.#storageCacheSize
}
}
: undefined;
}
/**
* @description Sets a registry swap (typically from Api)
*/
public setRegistrySwap (registrySwap: (blockHash: Uint8Array) => Promise<{ registry: Registry }>): void {
this.#getBlockRegistry = memoize(registrySwap, {
getInstanceId: () => this.#instanceId
});
}
/**
* @description Sets a function to resolve block hash from block number
*/
public setResolveBlockHash (resolveBlockHash: (blockNumber: AnyNumber) => Promise<Uint8Array>): void {
this.#getBlockHash = memoize(resolveBlockHash, {
getInstanceId: () => this.#instanceId
});
}
public addUserInterfaces (userRpc: Record<string, Record<string, DefinitionRpc | DefinitionRpcSub>>): void {
// add any extra user-defined sections
this.sections.push(...Object.keys(userRpc).filter((k) => !this.sections.includes(k)));
for (let s = 0, scount = this.sections.length; s < scount; s++) {
const section = this.sections[s];
const defs = objectSpread<Record<string, DefinitionRpc | DefinitionRpcSub>>({}, rpcDefinitions[section as 'babe'], userRpc[section]);
const methods = Object.keys(defs);
for (let m = 0, mcount = methods.length; m < mcount; m++) {
const method = methods[m];
const def = defs[method];
const jsonrpc = def.endpoint || `${section}_${method}`;
if (!this.mapping.has(jsonrpc)) {
const isSubscription = !!(def as DefinitionRpcSub).pubsub;
if (!(this as Record<string, unknown>)[section]) {
(this as Record<string, unknown>)[section] = {};
}
this.mapping.set(jsonrpc, objectSpread({}, def, { isSubscription, jsonrpc, method, section }));
lazyMethod(this[section as 'connect'], method, () =>
isSubscription
? this._createMethodSubscribe(section, method, def as DefinitionRpcSub)
: this._createMethodSend(section, method, def)
);
}
}
}
}
private _memomize (creator: <T> (isScale: boolean) => (...values: unknown[]) => Observable<T>, def: DefinitionRpc): MemoizedRpcInterfaceMethod {
const memoOpts = { getInstanceId: () => this.#instanceId };
const memoized = memoize(creator(true) as RpcInterfaceMethod, memoOpts);
memoized.raw = memoize(creator(false), memoOpts);
memoized.meta = def;
return memoized as MemoizedRpcInterfaceMethod;
}
private _formatResult <T> (isScale: boolean, registry: Registry, blockHash: string | Uint8Array | null | undefined, method: string, def: DefinitionRpc, params: Codec[], result: unknown): T {
return isScale
? this._formatOutput(registry, blockHash, method, def, params, result) as unknown as T
: result as T;
}
private _createMethodSend (section: string, method: string, def: DefinitionRpc): RpcInterfaceMethod {
const rpcName = def.endpoint || `${section}_${method}`;
const hashIndex = def.params.findIndex(({ isHistoric }) => isHistoric);
let memoized: null | MemoizedRpcInterfaceMethod = null;
// execute the RPC call, doing a registry swap for historic as applicable
const callWithRegistry = async <T> (isScale: boolean, values: unknown[]): Promise<T> => {
const blockId = hashIndex === -1
? null
: values[hashIndex];
const blockHash = blockId && def.params[hashIndex].type === 'BlockNumber'
? await this.#getBlockHash?.(blockId as AnyNumber)
: blockId as (Uint8Array | string | null | undefined);
const { registry } = isScale && blockHash && this.#getBlockRegistry
? await this.#getBlockRegistry(u8aToU8a(blockHash))
: { registry: this.#registryDefault };
const params = this._formatParams(registry, null, def, values);
// only cache .at(<blockHash>) queries, e.g. where valid blockHash was supplied
const result = await this.provider.send<AnyJson>(rpcName, params.map((p) => p.toJSON()), !!blockHash);
return this._formatResult(isScale, registry, blockHash, method, def, params, result);
};
const creator = <T> (isScale: boolean) => (...values: unknown[]): Observable<T> => {
const isDelayed = isScale && hashIndex !== -1 && !!values[hashIndex];
return new Observable((observer: Observer<T>): () => void => {
callWithRegistry<T>(isScale, values)
.then((value): void => {
observer.next(value);
observer.complete();
})
.catch((error: Error): void => {
logErrorMessage(method, def, error);
observer.error(error);
observer.complete();
});
return (): void => {
// delete old results from cache
if (isScale) {
memoized?.unmemoize(...values);
} else {
memoized?.raw.unmemoize(...values);
}
};
}).pipe(
// eslint-disable-next-line deprecation/deprecation
publishReplay(1), // create a Replay(1)
isDelayed
? refCountDelay() // Unsubscribe after delay
// eslint-disable-next-line deprecation/deprecation
: refCount()
);
};
memoized = this._memomize(creator, def);
return memoized;
}
// create a subscriptor, it subscribes once and resolves with the id as subscribe
private _createSubscriber ({ paramsJson, subName, subType, update }: { subType: string; subName: string; paramsJson: AnyJson[]; update: ProviderInterfaceCallback }, errorHandler: (error: Error) => void): Promise<number | string> {
return new Promise((resolve, reject): void => {
this.provider
.subscribe(subType, subName, paramsJson, update)
.then(resolve)
.catch((error: Error): void => {
errorHandler(error);
reject(error);
});
});
}
private _createMethodSubscribe (section: string, method: string, def: DefinitionRpcSub): RpcInterfaceMethod {
const [updateType, subMethod, unsubMethod] = def.pubsub;
const subName = `${section}_${subMethod}`;
const unsubName = `${section}_${unsubMethod}`;
const subType = `${section}_${updateType}`;
let memoized: null | MemoizedRpcInterfaceMethod = null;
const creator = <T> (isScale: boolean) => (...values: unknown[]): Observable<T> => {
return new Observable((observer: Observer<T>): () => void => {
// Have at least an empty promise, as used in the unsubscribe
let subscriptionPromise: Promise<number | string | null> = Promise.resolve(null);
const registry = this.#registryDefault;
const errorHandler = (error: Error): void => {
logErrorMessage(method, def, error);
observer.error(error);
};
try {
const params = this._formatParams(registry, null, def, values);
const update = (error?: Error | null, result?: unknown): void => {
if (error) {
logErrorMessage(method, def, error);
return;
}
try {
observer.next(this._formatResult(isScale, registry, null, method, def, params, result));
} catch (error) {
observer.error(error);
}
};
subscriptionPromise = this._createSubscriber({ paramsJson: params.map((p) => p.toJSON()), subName, subType, update }, errorHandler);
} catch (error) {
errorHandler(error as Error);
}
// Teardown logic
return (): void => {
// Delete from cache, so old results don't hang around
if (isScale) {
memoized?.unmemoize(...values);
} else {
memoized?.raw.unmemoize(...values);
}
// Unsubscribe from provider
subscriptionPromise
.then((subscriptionId): Promise<boolean> =>
isNull(subscriptionId)
? Promise.resolve(false)
: this.provider.unsubscribe(subType, unsubName, subscriptionId)
)
.catch((error: Error) => logErrorMessage(method, def, error));
};
}).pipe(drr());
};
memoized = this._memomize(creator, def);
return memoized;
}
private _formatParams (registry: Registry, blockHash: Uint8Array | string | null | undefined, def: DefinitionRpc, inputs: unknown[]): Codec[] {
const count = inputs.length;
const reqCount = def.params.filter(({ isOptional }) => !isOptional).length;
if (count < reqCount || count > def.params.length) {
throw new Error(`Expected ${def.params.length} parameters${reqCount === def.params.length ? '' : ` (${def.params.length - reqCount} optional)`}, ${count} found instead`);
}
const params = new Array<Codec>(count);
for (let i = 0; i < count; i++) {
params[i] = registry.createTypeUnsafe(def.params[i].type, [inputs[i]], { blockHash });
}
return params;
}
private _formatOutput (registry: Registry, blockHash: Uint8Array | string | null | undefined, method: string, rpc: DefinitionRpc, params: Codec[], result?: unknown): Codec | Codec[] {
if (rpc.type === 'StorageData') {
const key = params[0] as StorageKey;
return this._formatStorageData(registry, blockHash, key, result as string);
} else if (rpc.type === 'StorageChangeSet') {
const keys = params[0] as Vec<StorageKey>;
return keys
? this._formatStorageSet(registry, (result as StorageChangeSetJSON).block, keys, (result as StorageChangeSetJSON).changes)
: registry.createType('StorageChangeSet', result);
} else if (rpc.type === 'Vec<StorageChangeSet>') {
const jsonSet = (result as StorageChangeSetJSON[]);
const count = jsonSet.length;
const mapped = new Array<[Hash, Codec[]]>(count);
for (let i = 0; i < count; i++) {
const { block, changes } = jsonSet[i];
mapped[i] = [
registry.createType('BlockHash', block),
this._formatStorageSet(registry, block, params[0] as Vec<StorageKey>, changes)
];
}
// we only query at a specific block, not a range - flatten
return method === 'queryStorageAt'
? mapped[0][1]
: mapped as unknown as Codec[];
}
return registry.createTypeUnsafe(rpc.type, [result], { blockHash });
}
private _formatStorageData (registry: Registry, blockHash: Uint8Array | string | null | undefined, key: StorageKey, value: string | null): Codec {
const isEmpty = isNull(value);
// we convert to Uint8Array since it maps to the raw encoding, all
// data will be correctly encoded (incl. numbers, excl. :code)
const input = isEmpty
? null
: isTreatAsHex(key)
? value
: u8aToU8a(value);
return this._newType(registry, blockHash, key, input, isEmpty);
}
private _formatStorageSet (registry: Registry, blockHash: string, keys: Vec<StorageKey>, changes: [string, string | null][]): Codec[] {
// For StorageChangeSet, the changes has the [key, value] mappings
const count = keys.length;
const withCache = count !== 1;
const values = new Array<Codec>(count);
// multiple return values (via state.storage subscription), decode the
// values one at a time, all based on the supplied query types
for (let i = 0; i < count; i++) {
values[i] = this._formatStorageSetEntry(registry, blockHash, keys[i], changes, withCache, i);
}
return values;
}
private _formatStorageSetEntry (registry: Registry, blockHash: string, key: StorageKey, changes: [string, string | null][], withCache: boolean, entryIndex: number): Codec {
const hexKey = key.toHex();
const found = changes.find(([key]) => key === hexKey);
const isNotFound = isUndefined(found);
// if we don't find the value, this is our fallback
// - in the case of an array of values, fill the hole from the cache
// - if a single result value, don't fill - it is not an update hole
// - fallback to an empty option in all cases
if (isNotFound && withCache) {
const cached = this.#storageCache.get(hexKey);
if (cached) {
this.#storageCacheHits++;
return cached;
}
}
const value = isNotFound
? null
: found[1];
const isEmpty = isNull(value);
const input = isEmpty || isTreatAsHex(key)
? value
: u8aToU8a(value);
const codec = this._newType(registry, blockHash, key, input, isEmpty, entryIndex);
// store the retrieved result - the only issue with this cache is that there is no
// clearing of it, so very long running processes (not just a couple of hours, longer)
// will increase memory beyond what is allowed.
this.#storageCache.set(hexKey, codec);
this.#storageCacheSize++;
return codec;
}
private _newType (registry: Registry, blockHash: Uint8Array | string | null | undefined, key: StorageKey, input: string | Uint8Array | null, isEmpty: boolean, entryIndex = -1): Codec {
// single return value (via state.getStorage), decode the value based on the
// outputType that we have specified. Fallback to Raw on nothing
const type = key.outputType || 'Raw';
const meta = key.meta || EMPTY_META;
const entryNum = entryIndex === -1
? ''
: ` entry ${entryIndex}:`;
try {
return registry.createTypeUnsafe(type, [
isEmpty
? meta.fallback
// For old-style Linkage, we add an empty linkage at the end
? type.includes('Linkage<')
? u8aConcat(hexToU8a(meta.fallback.toHex()), new Uint8Array(2))
: hexToU8a(meta.fallback.toHex())
: undefined
: meta.modifier.isOptional
? registry.createTypeUnsafe(type, [input], { blockHash, isPedantic: this.#isPedantic })
: input
], { blockHash, isFallback: isEmpty && !!meta.fallback, isOptional: meta.modifier.isOptional, isPedantic: this.#isPedantic && !meta.modifier.isOptional });
} catch (error) {
throw new Error(`Unable to decode storage ${key.section || 'unknown'}.${key.method || 'unknown'}:${entryNum}: ${(error as Error).message}`);
}
}
}