src/core/api-next/subscription-registry.ts
import { Observable } from "rxjs";
import { EventName } from "../..";
import { IBApiNext, IBApiNextError, ItemListUpdate } from "../../api-next";
import { IBApiAutoConnection } from "./auto-connection";
import { IBApiNextMap } from "./map";
import { IBApiNextSubscription } from "./subscription";
/** An id that uniquely identifies the type of a subscription. */
export type SubscriptionTypeId = string;
/** An id that uniquely identifies the subscription instance. */
export type SubscriptionInstanceId = string;
/** The log tag */
const LOG_TAG = "IBApiNextSubscriptionRegistry";
/**
* @internal
*
* An entry on the subscription registry.
*/
class RegistryEntry {
/**
* Create a new [[RegistryEntry]] object.
*
* @param eventName The [[IBApi]] event name.
* @param callback The event callback handler.
*/
constructor(
public readonly eventName: EventName,
public readonly callback: (
subscriptions: Map<number, IBApiNextSubscription<unknown>>,
...eventArgs: unknown[]
) => void,
) {
this.listener = (...eventArgs) => {
this.callback(this.subscriptions, ...eventArgs);
};
}
/** The event listener on [[IBApi]]. */
public readonly listener: (...eventArgs: unknown[]) => void;
/** Map of all active subscriptions, with reqId as key. */
public readonly subscriptions: Map<number, IBApiNextSubscription<unknown>> =
new Map();
}
/**
* @internal
*
* The subscription registry as used by [[IBApiNext]].
*
* The subscription registry maintains the list of all currently
* registered subscriptions. See [[IBApiNext.register]] about how
* register a subscription.
*/
export class IBApiNextSubscriptionRegistry {
/**
* Create an [[IBApiNextSubscriptionRegistry]] instance.
*
* @param api The [[IBApiAutoConnection]] instance for event listener registration and
* invoking TWS API.
* @param apiNext The [[IBApiNext]] instance for observing the connection state.
*/
constructor(
private readonly api: IBApiAutoConnection,
private readonly apiNext: IBApiNext,
) {}
/** A Map containing the subscription registry, with event name as key. */
private readonly entires = new IBApiNextMap<EventName, RegistryEntry>();
/**
* Register a subscription.
*
* @param requestFunction A callback, invoked when the start request shall be send to TWS.
* @param cancelFunction A callback, invoked when the cancel request shall be send to TWS.
* @param eventHandler Array of IB API event, callback function to handle this event.
* @param instanceId When not undefined, this an id that uniquely identifies
* the subscription instance. This can be used to avoid creation of multiple subscriptions,
* that will end up on same TWS request (i.e. request same market data multiple times), but an
* existing subscription instance will be re-used if same instanceId does already exist.
* As a general rule: don't use instanceId when there is no reqId or when you want to return
* a Promise (single emitted value). Use it everywhere else.
*/
register<T>(
requestFunction: (reqId: number) => void,
cancelFunction: (reqId: number) => void | null | undefined,
eventHandler: [
EventName,
(
subscriptions: Map<number, IBApiNextSubscription<T>>,
...eventArgs: unknown[]
) => void,
][],
instanceId?: string,
): Observable<ItemListUpdate<T>> {
// get the existing registry entries, or add if not existing yet
const entries: RegistryEntry[] = [];
eventHandler.forEach((handler) => {
const eventName = handler[0];
const callback = handler[1];
const entry = this.entires.getOrAdd(eventName, () => {
const entry = new RegistryEntry(eventName, callback);
this.apiNext.logger.debug(
LOG_TAG,
`Add RegistryEntry for EventName.${eventName}`,
);
this.api.addListener(eventName, entry.listener);
return entry;
});
entries.push(entry);
});
// lookup subscription by instance id
let subscription: IBApiNextSubscription<T>;
if (instanceId) {
entries.forEach((entry) => {
const values = entry.subscriptions.values();
while (!subscription) {
const it = values.next();
if (it.done) {
break;
}
if (
(it.value as IBApiNextSubscription<T>).instanceId === instanceId
) {
subscription = it.value;
}
}
});
}
// create new subscription
if (!subscription) {
subscription = new IBApiNextSubscription<T>(
this.apiNext,
() => {
requestFunction(subscription.reqId);
},
() => {
if (cancelFunction) {
cancelFunction(subscription.reqId);
}
},
() => {
entries.forEach((entry) => {
entry.subscriptions.delete(subscription.reqId);
if (!entry.subscriptions.size) {
this.api.removeListener(entry.eventName, entry.listener);
this.apiNext.logger.debug(
LOG_TAG,
`Remove RegistryEntry for EventName.${entry.eventName}.`,
);
this.entires.delete(entry.eventName);
}
});
this.apiNext.logger.debug(
LOG_TAG,
`Deleted IBApiNextSubscription for ${subscription.reqId}.`,
);
},
instanceId,
);
entries.forEach((entry) => {
this.apiNext.logger.debug(
LOG_TAG,
`Add IBApiNextSubscription on EventName.${entry.eventName} for ${subscription.reqId}.`,
);
entry.subscriptions.set(subscription.reqId, subscription);
});
}
// create an observable on the subscription
return subscription.createObservable();
}
/**
* Dispatch an error into the subscription that owns the given request id.
*/
dispatchError(reqId: number, error: IBApiNextError): void {
this.entires.forEach((entry) => {
entry.subscriptions.get(reqId)?.error(error);
});
}
}