src/core/api-next/subscription.ts
import { Observable, ReplaySubject, Subscription } from "rxjs";
import { map } from "rxjs/operators";
import { IBApiNext, IBApiNextError, ItemListUpdate } from "../../api-next";
import { ConnectionState } from "../../api-next/common/connection-state";
import { IBApiNextItemListUpdate } from "./item-list-update";
/**
* @internal
*
* This class implements the management of a subscription on the TWS API.
*
* It provides a method to create a [[Observable]] for starting / stopping
* the TWS subscription and observing changes.
*
* The class will take care to call the request and cancel functions when needed.
*/
export class IBApiNextSubscription<T> {
/**
* Create a [[IBApiNextSubscription]] object.
*
* @param api The [[IBApiNext]] instance.
* @param requestFunction A callback, invoked when the start request shall be send to TWS.
* @param cancelFunction A callback, invoked when the cancel request shell be send to TWS.
* @param cleanupFunction A callback, invoked when the last observer has unsubscribed from the subject.
*/
constructor(
private api: IBApiNext,
private requestFunction: () => void,
private cancelFunction: () => void,
private cleanupFunction: () => void,
public readonly instanceId?: string,
) {
this.reqId = api.nextReqId;
}
/** The request id of this subscription. */
public reqId: number;
/** Number of active observers. */
private observersCount = 0;
/** The replay subject, holding the latest emitted values. */
private subject = new ReplaySubject<IBApiNextItemListUpdate<T>>(1);
/** To prepare RxJS v8 that will remove subject.hasError */
private hasError = false;
/** The last 'all' value as send to subscribers. */
private _lastAllValue?: T;
/** The [[Subscription]] on the connection state. */
private connectionState$?: Subscription;
/** @internal True when the end-event on an enumeration request has been received, false otherwise. */
public endEventReceived = false;
/** Get the last 'all' value as send to subscribers. */
get lastAllValue(): T | undefined {
return this._lastAllValue;
}
/** @internal Set the last 'all' value without publishing it to subscribers. For internal use only. */
set lastAllValue(value: T) {
this._lastAllValue = value;
}
/**
* Send the next value to subscribers.
*
* @param value: The next value.
*/
next(value: IBApiNextItemListUpdate<T>): void {
this._lastAllValue = value.all;
this.subject.next(value);
}
/** Signal to subscribed that the options is complete. */
complete() {
this.subject.complete();
}
/**
* Send an error to subscribers, reset latest value to
* undefined and cancel TWS subscription.
*
* @param error: The [[IBApiError]] object.
*/
error(error: IBApiNextError): void {
delete this._lastAllValue;
this.endEventReceived = false;
this.hasError = true;
this.subject.error(error);
this.cancelTwsSubscription();
}
/**
* Create an Observable to start/stop the subscription on
* TWS, receive update and error events.
*/
createObservable(): Observable<ItemListUpdate<T>> {
return new Observable<ItemListUpdate<T>>((subscriber) => {
// create new subject and reqId if there is an has error
if (this.hasError) {
this.hasError = false;
this.subject = new ReplaySubject(1);
this.reqId = this.api.nextReqId;
}
// subscribe on subject
const subscription$ = this.subject
.pipe(
map((val, index) => {
return index === 0
? ({
all: val.all,
added: val.all,
} as ItemListUpdate<T>)
: val;
}),
)
.subscribe(subscriber);
// request from TWS if first subscriber
if (this.observersCount === 0) {
this.requestTwsSubscription();
}
this.observersCount++;
// handle unsubscribe
return (): void => {
subscription$.unsubscribe();
this.observersCount--;
if (this.observersCount <= 0) {
this.cancelTwsSubscription();
this.cleanupFunction();
}
};
});
}
/**
* Invoke TWS request function and setup connection state subscription
*/
private requestTwsSubscription(): void {
// subscribe on connection state: send TWS request when 'connected' state is signaled
if (!this.connectionState$) {
this.connectionState$ = this.api.connectionState.subscribe((state) => {
if (state === ConnectionState.Connected) {
delete this._lastAllValue;
this.endEventReceived = false;
this.requestFunction();
}
});
}
}
/**
* Invoke TWS cancel function and unsubscribe from connection state
*/
private cancelTwsSubscription(): void {
this.connectionState$?.unsubscribe();
delete this.connectionState$;
if (this.api.isConnected) {
this.cancelFunction();
}
}
}