albe/node-event-storage

View on GitHub
src/Consumer.js

Summary

Maintainability
A
0 mins
Test Coverage
const stream = require('stream');
const fs = require('fs');
const path = require('path');
const { assert, ensureDirectory } = require('./util');

const Storage = require('./Storage/ReadableStorage');
const MAX_CATCHUP_BATCH = 10;

/**
 * Safely unlink a file and ignore if it doesn't exist.
 * @param {string} filename
 */
const safeUnlink = (filename) => {
    /* istanbul ignore next */
    try {
        fs.unlinkSync(filename);
    } catch (e) {
        if (e.code !== "ENOENT") {
            throw e;
        }
    }
};

/**
 * Implements an event-driven durable Consumer that provides at-least-once delivery semantics or exactly-once processing semantics if only using setState().
 */
class Consumer extends stream.Readable {

    /**
     * @param {Storage} storage The storage to create the consumer for.
     * @param {string} indexName The name of the index to consume.
     * @param {string} identifier The unique name to identify this consumer.
     * @param {object} [initialState] The initial state of the consumer.
     * @param {number} [startFrom] The revision to start from within the index to consume.
     */
    constructor(storage, indexName, identifier, initialState = {}, startFrom = 0) {
        super({ objectMode: true });

        assert(storage instanceof Storage, 'Must provide a storage for the consumer.');
        assert(typeof indexName === 'string' && indexName !== '', 'Must specify an index name for the consumer.');
        assert(typeof identifier === 'string' && identifier !== '', 'Must specify an identifier name for the consumer.');

        this.initializeStorage(storage, indexName, identifier);
        this.restoreState(initialState, startFrom);
        this.handler = this.handleNewDocument.bind(this);
        this.on('error', () => (this.handleDocument = false));
    }

    /**
     * @private
     * @param {Storage} storage The storage to create the consumer for.
     * @param {string} indexName The name of the index to consume.
     * @param {string} identifier The unique name to identify this consumer.
     */
    initializeStorage(storage, indexName, identifier) {
        this.storage = storage;
        this.index = this.storage.openIndex(indexName);
        this.indexName = indexName;
        const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers');
        this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier);
        if (ensureDirectory(consumerDirectory)) {
            this.cleanUpFailedWrites();
        }
    }

    /**
     * Iterate over all files in the directory of this consumer and unlink any file that starts with the filename followed by a dot.
     * @private
     */
    cleanUpFailedWrites() {
        const consumerNamePrefix = path.basename(this.fileName) + '.';
        const consumerDirectory = path.dirname(this.fileName);
        const files = fs.readdirSync(consumerDirectory);
        for (let file of files) {
            if (file.startsWith(consumerNamePrefix)) {
                safeUnlink(path.join(consumerDirectory, file));
            }
        }
    }

    /**
     * @private
     * @param {object} initialState The initial state if no persisted state exists.
     * @param {number} startFrom The revision to start from within the index to consume.
     */
    restoreState(initialState, startFrom) {
        /* istanbul ignore if */
        if (!this.fileName) {
            return;
        }
        if (typeof initialState === 'number') {
            startFrom = initialState;
            initialState = {};
        }
        try {
            const consumerData = fs.readFileSync(this.fileName);
            this.position = consumerData.readInt32LE(0);
            this.state = JSON.parse(consumerData.toString('utf8', 4));
        } catch (e) {
            this.position = startFrom;
            this.state = initialState;
        }
        Object.freeze(this.state);

        this.persisting = null;
        this.consuming = false;
    }

    /**
     * Update the state of this consumer transactionally with the position.
     * May only be called from within the document handling callback.
     *
     * @param {object|function(object):object} newState
     * @param {boolean} [persist] Set to false if this state update should not be persisted yet
     * @api
     */
    setState(newState, persist = true) {
        assert(this.handleDocument, 'Called setState outside of document handler!');

        if (typeof newState === 'function') {
            newState = newState(this.state);
        }
        this.state = Object.freeze(newState);
        this.doPersist = persist;
    }

    /**
     * Handler method that is supposed to be triggered for each new document in the storage.
     *
     * @private
     * @param {string} name The name of the index the document was added for.
     * @param {number} position The 1-based position inside the index that the document was added to.
     * @param {object} document The document that was added.
     */
    handleNewDocument(name, position, document) {
        if (name !== this.indexName) {
            return;
        }

        /* istanbul ignore if */
        if (this.position !== position - 1) {
            return;
        }

        this.handleDocument = true;
        this.once('data', () => (this.handleDocument = false));
        if (!this.push(document)) {
            this.stop();
        }
        this.position = position;
        if (this.doPersist) {
            this.persist();
        }
    }

    /**
     * Persist current state of this consumer.
     * This will write the current position and state to the consumer storage file.
     *
     * @private
     */
    persist() {
        if (this.persisting) {
            return;
        }
        this.persisting = setImmediate(() => {
            const consumerState = JSON.stringify(this.state);
            const consumerData = Buffer.allocUnsafe(4 + consumerState.length);
            consumerData.writeInt32LE(this.position, 0);
            consumerData.write(consumerState, 4, consumerState.length, 'utf-8');
            const tmpFile = this.fileName + '.' + this.position;
            this.persisting = null;
            /* istanbul ignore if */
            if (fs.existsSync(tmpFile)) {
                throw new Error(`Trying to update consumer ${this.name} concurrently. Keep each single consumer within a single process.`);
            }
            try {
                fs.writeFileSync(tmpFile, consumerData);
                // If the write fails (half-way), the consumer state file will not be corrupted
                fs.renameSync(tmpFile, this.fileName);
                this.emit('persisted', consumerState);
            } catch (e) {
                /* istanbul ignore next */
                safeUnlink(tmpFile);
            }
        });
    }

    /**
     * Check if this consumer has caught up. If so, register a handler for the stream and emit a 'caught-up' event.
     *
     * @private
     * @returns {boolean} True if this consumer has caught up and can
     */
    checkCaughtUp() {
        if (this.index.length <= this.position) {
            this.handleDocument = false;
            this.storage.on('index-add', this.handler);
            this.emit('caught-up');
            return true;
        }
        return (this.consuming === false);
    }

    /**
     * Consume (push) a number of documents and update the position record.
     *
     * @private
     * @param {Array|Generator} documents The list or a stream of documents to consume
     */
    consumeDocuments(documents) {
        for (let document of documents) {
            if (!this.push(document)) {
                this.stop();
                break;
            }
            ++this.position;
        }
    }

    /**
     * Start consuming documents.
     *
     * This will also catch up from the last position in case new documents were added.
     * @api
     */
    start() {
        if (this.isPaused()) {
            this.resume();
        }
        if (this.consuming) {
            return;
        }
        this.consuming = true;
        this.handleDocument = true;

        // Catch up to current index position
        const catchUpBatch = () => {
            setImmediate(() => {
                if (this.checkCaughtUp()) {
                    return;
                }

                const maxBatchPosition = Math.min(this.position + MAX_CATCHUP_BATCH + 1, this.index.length);
                const documents = this.storage.readRange(this.position + 1, maxBatchPosition, this.index);
                this.consumeDocuments(documents);
                this.once('persisted', () => catchUpBatch());
                this.persist();
            });
        };
        catchUpBatch();
    }

    /**
     * Stop consuming new documents. Consuming can be started again at any time.
     * @api
     */
    stop() {
        if (this.consuming) {
            this.pause();
        }
        this.storage.removeListener('index-add', this.handler);
        this.consuming = false;
        this.handleDocument = false;
    }

    /**
     * Reset this projection to restart processing all documents again.
     * NOTE: This will overwrite the current state of the projection and hence be destructive.
     * @param {object} [initialState] The initial state of the consumer.
     * @param {number} [startFrom] The revision to start from within the index to consume.
     * @api
     */
    reset(initialState = {}, startFrom = 0) {
        if (typeof initialState === 'number') {
            startFrom = initialState;
            initialState = {};
        }
        const restart = this.consuming;
        this.stop();
        this.state = Object.freeze(initialState);
        this.position = startFrom;
        this.persist();
        if (restart) {
            this.start();
        }
    }

    // noinspection JSUnusedGlobalSymbols
    /**
     * Readable stream implementation.
     * @private
     */
    _read() {
        if (this.isPaused()) {
            return;
        }
        this.start();
    }
}

module.exports = Consumer;