OpenHPS/openhps-core

View on GitHub
src/worker/WorkerHandler.ts

Summary

Maintainability
C
1 day
Test Coverage
import { BlobWorker, Pool, spawn, Thread, Worker } from 'threads';
import { PoolEvent } from 'threads/dist/master/pool';
import { Observable } from 'threads/observable';
import { DataFrame } from '../data/DataFrame';
import { DataSerializer } from '../data/DataSerializer';
import { PullOptions, PushOptions } from '../graph/options';
import { DataService } from '../service/DataService';
import { Service } from '../service/Service';
import { WorkerServiceCall, WorkerServiceProxy, WorkerServiceResponse } from '../service/WorkerServiceProxy';
import { WorkerData } from './WorkerData';
import { AsyncEventEmitter } from '../_internal/AsyncEventEmitter';
import { DummyDataService } from '../service/DummyDataService';
import { DummyService } from '../service/DummyService';
import { ModelGraph } from '../graph/_internal/implementations/ModelGraph';
import { WorkerOptions } from './WorkerOptions';
import { Model } from '../Model';
import { Serializable } from '../data/decorators';

declare const __non_webpack_require__: typeof require;

export class WorkerHandler extends AsyncEventEmitter {
    private _pool: Pool<Thread>;
    public config: WorkerData;
    public options: WorkerOptions;
    private _serviceOutputResponse: Map<number, (response: WorkerServiceResponse) => Promise<void>> = new Map();
    protected model: Model;

    constructor(model: Model, options: WorkerOptions, config: WorkerData) {
        super();
        this.model = model;
        this.config = config;
        this.options = options;
        this.options.timeout = this.options.timeout ?? 10000;
    }

    build(): Promise<void> {
        return new Promise<void>((resolve) => {
            if (typeof process.env.NODE_ENV === 'undefined') {
                // eslint-disable-next-line
                const NativeWorker = typeof __non_webpack_require__ === "function" ? __non_webpack_require__("worker_threads").Worker : eval("require")("worker_threads").Worker;
                if (NativeWorker) {
                    // NodeJS
                    NativeWorker.defaultMaxListeners = 0;
                    const resolvedPath = require.resolve(this.options.worker);
                    if (resolvedPath.match(/\.tsx?$/i)) {
                        // Transpile
                        this.options.worker = `
                            require('ts-node').register();
                            require(${JSON.stringify(resolvedPath)});
                        `;
                        this.options.blob = true;
                    }
                }
            }

            this._pool = Pool(() => this._spawnWorker(), {
                size: this.options.poolSize || 4,
                concurrency: this.options.poolConcurrency || 2,
            });
            this._pool.events().subscribe((value: PoolEvent<Thread>) => {
                if (value.type === 'initialized') {
                    resolve();
                }
            });
        });
    }

    destroy(): Promise<void> {
        return new Promise((resolve, reject) => {
            if (this._pool === undefined) {
                return resolve();
            }
            const timeout = setTimeout(() => {
                this._pool
                    .terminate(true)
                    .then(() => {
                        resolve();
                    })
                    .catch((ex) => {
                        reject(ex);
                    });
            }, 2500);
            this._pool
                .terminate()
                .then(() => {
                    clearTimeout(timeout);
                    resolve();
                })
                .catch((ex) => {
                    clearTimeout(timeout);
                    reject(ex);
                });
        });
    }

    pull(options?: PullOptions): Promise<void> {
        return new Promise((resolve, reject) => {
            // Pass the pull request to the worker
            this._pool
                .queue((worker: any) => {
                    const pullFn: (options?: PullOptions) => Promise<void> = worker.pull;
                    return pullFn(options);
                })
                .then(resolve)
                .catch(reject);
        });
    }

    push(frame: DataFrame | DataFrame[], options?: PushOptions): Promise<void> {
        return new Promise<void>((resolve, reject) => {
            this._pool
                .queue((worker: any) => {
                    const pushFn: (frame: any, options?: PushOptions) => Promise<void> = worker.push;
                    return pushFn(DataSerializer.serialize(frame), options);
                })
                .then(() => {
                    resolve();
                })
                .catch(reject);
        });
    }

    invokeMethod<T>(methodName: string, ...args: any[]): Promise<Serializable<T> | Serializable<T>[] | void> {
        return new Promise((resolve, reject) => {
            this._pool
                .queue((worker: any) => {
                    const invokeMethod: (
                        methodName: string,
                        ...args: any[]
                    ) => Promise<Serializable<T> | Serializable<T>[] | void> = worker.invokeMethod;
                    return invokeMethod(methodName, ...args.map((a) => DataSerializer.serialize(a)));
                })
                .then((result) => {
                    if (result !== undefined) {
                        resolve(DataSerializer.deserialize(result));
                    } else {
                        resolve(result);
                    }
                })
                .catch(reject);
        });
    }

    protected createWorker(): Worker {
        if (this.options.blob) {
            const worker = new BlobWorker(this.options.worker as any, {
                type: this.options.type === 'typescript' ? 'classic' : this.options.type,
            });
            return worker;
        } else {
            const worker = new Worker(this.options.worker, {
                type: this.options.type === 'typescript' ? 'classic' : this.options.type,
            });
            return worker;
        }
    }

    /**
     * Spawn a single worker
     * This method can be called multiple times in a pool
     * @returns {Promise<Thread>} Thread spawn promise
     */
    private _spawnWorker(): Promise<Thread> {
        return new Promise((resolve, reject) => {
            const worker = this.createWorker();
            spawn(worker, {
                timeout: this.options.timeout,
            })
                .then((thread: Thread) => {
                    const init: (workerData: WorkerData) => Promise<void> = (thread as any).init;
                    const pushOutput: () => Observable<any> = (thread as any).pushOutput;
                    const pullOutput: () => Observable<void> = (thread as any).pullOutput;
                    const serviceOutputCall: () => Observable<WorkerServiceCall> = (thread as any).serviceOutputCall;
                    const serviceInputCall: (call: WorkerServiceCall) => Promise<WorkerServiceResponse> = (
                        thread as any
                    ).serviceInputCall;
                    const eventOutput: () => Observable<any> = (thread as any).eventOutput;
                    const findAllServices: () => Promise<any[]> = (thread as any).findAllServices;

                    const threadId = (worker as any).threadId;
                    this._serviceOutputResponse.set(threadId, (thread as any).serviceOutputResponse);

                    // Subscribe to the workers pull, push and service functions
                    pullOutput().subscribe(this._onWorkerPull.bind(this));
                    pushOutput().subscribe(this._onWorkerPush.bind(this));
                    serviceOutputCall().subscribe(this._onWorkerService.bind(this, threadId));
                    eventOutput().subscribe(this._onWorkerEvent.bind(this));

                    // Initialize the worker
                    init({
                        directory: this.options.directory || __dirname,
                        services: this._getServices(),
                        imports: this.options.imports || [],
                        args: this.options.args || {},
                        type: this.options.type || 'classic',
                        methods: this.options.methods
                            ? this.options.methods.map((method) => {
                                  return {
                                      name: method.name,
                                      handlerFn: method.handler.toString(),
                                  };
                              })
                            : [],
                        ...this.config,
                    })
                        .then(() => {
                            return findAllServices();
                        })
                        .then((services: any[]) => {
                            this._addServices(services, serviceInputCall);
                            resolve(thread);
                        })
                        .catch(reject);
                })
                .catch(reject);
        });
    }

    /**
     * Serialize the services of this model
     * @returns {any[]} Services array
     */
    private _getServices(): any[] {
        // Serialize this model services to the worker
        const services: Service[] = this.options.services || this.model.findAllServices();
        const servicesArray: any[] = services.map((service) => {
            // Services are wrapped in a proxy. Get prototype
            const serviceBase = Object.getPrototypeOf(service);
            return {
                uid: service.uid,
                type: serviceBase.constructor.name,
                dataType:
                    service instanceof DataService ? (service.dataType ? service.dataType.name : undefined) : undefined,
            };
        });
        return servicesArray;
    }

    private _addServices(services: any[], call: (call: WorkerServiceCall) => Promise<WorkerServiceResponse>): void {
        const model = this.model as ModelGraph<any, any>;
        services
            .filter((service) => {
                const internalService =
                    this.model.findService(service.name) || this.model.findDataService(service.name);
                return internalService === undefined;
            })
            .forEach((service) => {
                if (service.dataType) {
                    const DataType = DataSerializer.findTypeByName(service.dataType);
                    model.addService(
                        new DummyDataService(service.uid, DataType),
                        new WorkerServiceProxy({
                            uid: service.uid,
                            callFunction: call,
                        }),
                    );
                } else {
                    model.addService(
                        new DummyService(service.uid),
                        new WorkerServiceProxy({
                            uid: service.uid,
                            callFunction: call,
                        }),
                    );
                }
            });
    }

    private _onWorkerService(threadId: number, value: WorkerServiceCall): void {
        const service: Service =
            this.model.findDataService(value.serviceUID) || this.model.findService(value.serviceUID);
        if ((service as any)[value.method]) {
            const serializedParams = value.parameters;
            const params: any[] = [];
            serializedParams.forEach((param: any) => {
                if (param['__type']) {
                    params.push(DataSerializer.deserialize(param));
                } else {
                    params.push(param);
                }
            });
            const promise = (service as any)[value.method](...params) as Promise<any>;
            Promise.resolve(promise)
                .then((_) => {
                    if (Array.isArray(_)) {
                        const result: any[] = [];
                        _.forEach((r) => {
                            result.push(DataSerializer.serialize(r));
                        });
                        this._serviceOutputResponse.get(threadId)({ id: value.id, success: true, result });
                    } else {
                        const result = DataSerializer.serialize(_);
                        this._serviceOutputResponse.get(threadId)({ id: value.id, success: true, result });
                    }
                })
                .catch((ex) => {
                    this._serviceOutputResponse.get(threadId)({ id: value.id, success: false, result: ex });
                });
        }
    }

    private _onWorkerEvent(value: { name: string; event: any }): void {
        this.emit('event', value);
    }

    /**
     * Triggered for each worker that requests a pull
     * @param {PullOptions} options Pull options
     */
    private _onWorkerPull(options?: PullOptions): void {
        this.emit('pull', options);
    }

    /**
     * Triggered for each worker that pushes data
     * @param {any} value Serialized data
     * @param {PushOptions} options Push options
     */
    private _onWorkerPush(value: any, options?: PushOptions): void {
        const deserializedFrame: DataFrame = DataSerializer.deserialize(value);
        this.emit('push', deserializedFrame, options);
    }
}