OpenHPS/openhps-core

View on GitHub
src/nodes/CallbackNode.ts

Summary

Maintainability
A
1 hr
Test Coverage
import { DataFrame } from '../data/DataFrame';
import { GraphOptions, PullOptions, PushOptions } from '../graph/options';
import { Node, NodeOptions } from '../Node';

/**
 * @category Node
 */
export class CallbackNode<InOut extends DataFrame> extends Node<InOut, InOut> {
    pushCallback: (frame: InOut | InOut[], options?: PushOptions) => Promise<void> | void;
    pullCallback: (options?: PullOptions) => InOut | InOut[] | Promise<InOut | InOut[]>;
    protected options: CallbackNodeOptions;

    constructor(
        pushCallback: (frame: InOut | InOut[]) => void = () => true,
        pullCallback: () => InOut | InOut[] = () => null,
        options?: CallbackNodeOptions,
    ) {
        super(options);
        this.pushCallback = pushCallback;
        this.pullCallback = pullCallback;

        this.on('push', this._onPush.bind(this));
        this.on('pull', this._onPull.bind(this));

        this.options.autoPush = this.options.autoPush || true;
    }

    private _onPush(frame: InOut | InOut[], options?: PushOptions): Promise<void> {
        return new Promise<void>((resolve, reject) => {
            Promise.resolve(this.pushCallback(frame, options))
                .then(() => {
                    if (this.options.autoPush) {
                        return Promise.all(this.outlets.map((outlet) => outlet.push(frame, options as GraphOptions)));
                    } else {
                        resolve();
                    }
                })
                .then(() => {
                    resolve();
                })
                .catch(reject);
        });
    }

    private _onPull(options?: PullOptions): Promise<void> {
        return new Promise<void>((resolve, reject) => {
            Promise.resolve(this.pullCallback(options))
                .then((result) => {
                    if (result !== undefined && result !== null) {
                        // Push result
                        Promise.all(this.outlets.map((outlet) => outlet.push(result, options as GraphOptions)))
                            .then(() => {
                                resolve();
                            })
                            .catch(reject);
                    } else {
                        // Forward pull
                        Promise.all(this.inlets.map((inlet) => inlet.pull(options)))
                            .then(() => {
                                resolve();
                            })
                            .catch(reject);
                    }
                })
                .catch(reject);
        });
    }
}

export interface CallbackNodeOptions extends NodeOptions {
    /**
     * Automatically push data frames. If set to false it is expected that the
     * callback handles the pushing.
     * @default true
     */
    autoPush?: boolean;
}