OpenHPS/openhps-core

View on GitHub
src/nodes/SourceNode.ts

Summary

Maintainability
A
1 hr
Test Coverage
import { DataFrame } from '../data/DataFrame';
import { DataObject } from '../data/object/DataObject';
import { SensorType } from '../data/object/SensorObject';
import { Node, NodeOptions } from '../Node';
import { GraphOptions, PullOptions, PushOptions } from '../graph/options';
import { SerializableObject, SerializableMember } from '../data/decorators';

/**
 * Source node
 *
 * ## Usage
 *
 * ### Creating a SourceNode
 * When creating a source node, you have to implement a promise based ```onPull``` method that expects a data
 * frame.
 *
 * As mentioned in the {@link Node} class, pulling normally does not require you to return
 * a data frame. The source node implementation provides an abstraction on top of this. If your source node can generate
 * data frames, you can resolve to a data frame. The data frame will then be pushed to outgoing nodes.
 * If not, you can simply resolve nothing or a null object.
 *
 * On top of this abstraction, a source node adds an intermediate output node that merges data objects from the [data service](#dataservice).
 * This way, the data frame pushed by the source will always be up-to-date and merged with existing processed information.
 *
 * ```typescript
 * import { DataFrame, SourceNode } from '@openhps/core';
 *
 * export class CustomSource<Out extends DataFrame> extends SourceNode<Out> {
 * // ...
 * constructor() {
 * // Source nodes expect a source object to be provided
 * super(new DataObject("mobile_input")));
 * }
 *
 * public onPull(options?: GraphPullOptions): Promise<Out> {
 * return new Promise<Out>((resolve, reject) => {
 * // ... pull request
 * // ... get data from somewhere
 *
 * const dataFrame = new DataFrame(this.getSource());
 * resolve(dataFrame);
 * });
 * }
 * }
 * ```
 * @category Source node
 */
@SerializableObject()
export abstract class SourceNode<Out extends DataFrame = DataFrame> extends Node<Out, Out> {
    @SerializableMember()
    protected declare options: SourceNodeOptions;

    /**
     * Construct a new source node
     * @param {SourceNodeOptions} [options] Source node options
     */
    constructor(options?: SourceNodeOptions) {
        super(options);

        // Default source settings
        this.options.persistence = this.options['persistence'] === undefined ? true : this.options.persistence;

        this.on('push', this._onPush.bind(this));
        this.on('pull', this._onPull.bind(this));

        if (this.source) {
            this.once('build', this.registerService.bind(this));
        }
    }

    /**
     * Get the source data object
     * @returns {DataObject} Source data object
     */
    protected get source(): DataObject {
        return this.options.source;
    }

    protected registerService(): Promise<void> {
        return new Promise<void>((resolve) => {
            const service = this.model.findDataService(this.source);
            // Update source when modified
            service.on('insert', (uid: string, object: DataObject) => {
                if (uid === this.source.uid) {
                    this.options.source = object;
                }
            });

            // Update to the latest version
            service
                .findByUID(this.source.uid)
                .then((object: DataObject) => {
                    this.options.source = object;
                    resolve();
                })
                .catch(() => {
                    // Ignore, most likely not calibrated or stored yet
                    resolve();
                });
        });
    }

    private _onPush(data: Out | Out[], options?: PushOptions): Promise<void> {
        return new Promise<void>((resolve, reject) => {
            const servicePromises: Array<Promise<DataFrame>> = [];

            if (this.options.persistence) {
                if (data instanceof Array) {
                    for (const f of data) {
                        servicePromises.push(this.mergeFrame(f));
                    }
                } else {
                    servicePromises.push(this.mergeFrame(data));
                }
            }

            Promise.all(servicePromises)
                .then(() => {
                    this.outlets.map((outlet) => outlet.push(data, options));
                    resolve();
                })
                .catch(reject);
        });
    }

    protected mergeFrame(frame: DataFrame): Promise<DataFrame> {
        return new Promise<DataFrame>((resolve, reject) => {
            const defaultService = this.model.findDataService(DataObject);
            const promises: Array<Promise<void>> = [];
            const objects: DataObject[] = [];
            frame.getObjects().forEach((object) => {
                objects.push(object);
            });
            objects.forEach((object) => {
                promises.push(
                    new Promise((objResolve) => {
                        let service = this.model.findDataService(object);
                        if (service === null || service === undefined) {
                            service = defaultService;
                        }
                        service
                            .findByUID(object.uid)
                            .then((existingObject: DataObject) => {
                                if (existingObject === null) {
                                    objResolve();
                                }

                                this.mergeObject(object, existingObject);
                                objResolve();
                            })
                            .catch(() => {
                                // Ignore
                                objResolve();
                            });
                    }),
                );
            });

            Promise.all(promises)
                .then(() => {
                    resolve(frame);
                })
                .catch(reject);
        });
    }

    /**
     * Merge an object
     * @param {DataObject} newObject New object
     * @param {DataObject} oldObject Existing object
     * @returns {DataObject} Existing object
     */
    protected mergeObject(newObject: DataObject, oldObject: DataObject): DataObject {
        newObject.displayName = newObject.displayName || oldObject.displayName;
        newObject.position = newObject.position || oldObject.position;
        newObject.parentUID = newObject.parentUID || oldObject.parentUID;
        oldObject.relativePositions.forEach((relativePosition) => {
            // Get the new relative position by its uid and type
            const newPosition = newObject.getRelativePosition(
                relativePosition.referenceObjectUID,
                relativePosition.constructor.name,
            );

            if (newPosition && newPosition.timestamp < relativePosition.timestamp) {
                // New object contains older relative position
                newObject.addRelativePosition(relativePosition);
            } else if (!newPosition) {
                // New object does not contain stored relative position
                newObject.addRelativePosition(relativePosition);
            }
        });
        return newObject;
    }

    private _onPull(options: PullOptions = {}): Promise<void> {
        if (options.sourceNode && options.sourceNode !== this.uid) {
            // Pull options indicate the pull on a specific source node
            return Promise.resolve();
        }

        const sequential = options['sequentialPull'] === undefined ? true : options.sequentialPull;
        if (sequential) {
            return this._onSequentialPull(options);
        } else {
            return this._onParallelPull(options);
        }
    }

    private _onSequentialPull(options: PullOptions): Promise<void> {
        const newOptions: PushOptions = {
            sourceNode: this.uid,
            // Expand options after setting source node
            // original source node should not be overridden
            ...(options as GraphOptions),
        };
        const count = options.count || 1;

        let promise = Promise.resolve();
        for (let i = 0; i < count; i++) {
            promise = promise.then(
                () =>
                    new Promise((resolve, reject) => {
                        this.onPull(options)
                            .then((frame) => {
                                if (frame !== undefined && frame !== null) {
                                    // Resolve after push is done
                                    return this.push(frame, newOptions);
                                } else {
                                    resolve();
                                }
                            })
                            .then(resolve)
                            .catch(reject);
                    }),
            );
        }
        return promise;
    }

    private _onParallelPull(options: PullOptions): Promise<void> {
        return new Promise((resolve, reject) => {
            const newOptions: PushOptions = {
                sourceNode: this.uid,
                // Expand options after setting source node
                // original source node should not be overridden
                ...(options as GraphOptions),
            };
            const count = options.count || 1;

            Promise.all([...Array(count).keys()].map(() => this.onPull(options)))
                .then((results) => {
                    const pushPromises: Array<Promise<void>> = [];
                    results.forEach((frame) => {
                        if (frame !== undefined && frame !== null) {
                            // Push without waiting
                            pushPromises.push(this.push(frame, newOptions));
                        }
                    });
                    return Promise.all(pushPromises);
                })
                .then(() => {
                    resolve();
                })
                .catch(reject);
        });
    }

    public abstract onPull(options?: PullOptions): Promise<Out | undefined>;
}

export interface SourceNodeOptions extends NodeOptions {
    /**
     * Merge objects from persisted source
     * @default true
     */
    persistence?: boolean;
    /**
     * Source data object
     */
    source?: DataObject;
}

export interface ActiveSourceOptions extends SourceNodeOptions {
    /**
     * Push interval
     */
    interval?: number;
    /**
     * Auto start the sensor
     */
    autoStart?: boolean;
    /**
     * Stopping the source will halt the pushing of frames but not stop the data collection.
     */
    softStop?: boolean;
}

export interface SensorSourceOptions extends ActiveSourceOptions {
    /**
     * Sensor types
     */
    sensors?: SensorType[];
}