OpenHPS/openhps-csv

View on GitHub
src/CSVDataSink.ts

Summary

Maintainability
A
0 mins
Test Coverage
import { DataFrame, SinkNode, SinkNodeOptions } from '@openhps/core';
import * as csv from 'csv-writer';
import { CsvWriter } from 'csv-writer/src/lib/csv-writer';
import { ObjectMap } from 'csv-writer/src/lib/lang/object';
import { ObjectStringifierHeader } from 'csv-writer/src/lib/record';

/**
 * CSVDataSink
 *
 * @category Sink node
 */
export class CSVDataSink<In extends DataFrame> extends SinkNode<In> {
    private _writeCallback: (frame: In) => DataFrame;
    private _file: string;
    private _csvWriter: CsvWriter<ObjectMap<any>>;
    private _header: ObjectStringifierHeader;
    private _writeQueue: any[] = [];
    private _writeReady = true;
    private _timeout: NodeJS.Timeout;
    private _pending: Array<() => void> = [];

    constructor(
        file: string,
        header: Array<{ id: string; title: string }>,
        writeCallback: (frame: In) => any,
        options?: SinkNodeOptions,
    ) {
        super(options);
        this._header = header;
        this._writeCallback = writeCallback;
        this._file = file;

        this.once('build', this._initCSV.bind(this));
    }

    private _initCSV(): Promise<void> {
        return new Promise((resolve) => {
            this._csvWriter = csv.createObjectCsvWriter({
                path: this._file,
                header: this._header,
            });
            resolve();
        });
    }

    public onPush(data: In): Promise<void> {
        return new Promise<void>((resolve, reject) => {
            this._writeQueue.push(this._writeCallback(data));
            this._handleQueue().then(resolve).catch(reject);
        });
    }

    private _handleQueue(): Promise<void> {
        return new Promise((resolve, reject) => {
            if (this._writeReady) {
                this._writeReady = false;
                const queue = new Array(...this._writeQueue);
                this._writeQueue = [];
                if (this._timeout) {
                    clearTimeout(this._timeout);
                    this._timeout = undefined;
                }
                this._csvWriter
                    .writeRecords(queue)
                    .then(() => {
                        this._pending.forEach((pending) => pending());
                        this._pending.slice(0, this._pending.length);
                        resolve();
                    })
                    .catch(reject)
                    .finally(() => {
                        this._writeReady = true;
                    });
            } else {
                this._timeout = setTimeout(this._handleQueue.bind(this), 100);
                this._pending.push(resolve);
            }
        });
    }
}