ngryman/ribs

View on GitHub
lib/pipeline.js

Summary

Maintainability
B
4 hrs
Test Coverage
/*!
 * ribs
 * Copyright (c) 2013-2014 Nicolas Gryman <ngryman@gmail.com>
 * LGPL Licensed
 */

'use strict';

/**
 * Module dependencies.
 */

var util = require('util'),
    EventEmitter = require('events').EventEmitter,
    _ = require('lodash'),
    async = require('async'),
    utils = require('./utils'),
    check = utils.checkType,
    hooks = require('./hooks'),
    createStream = require('./stream').createStream;

/**
 * The `Pipeline` object provides a unified API to hold and execute consecutive operations to be applied to an image.
 *
 * The idea here is *lazy evaluation*. This means that the user can chain multiple operation with the fluent API of RIBS
 * but only execute them when he decides. A *pipeline* will then execute every operations in order and asynchronously.
 * This has the advantage of *batching* file operations and avoid back and forth between disk and memory.
 *
 * A pipeline instance is return by ribs front-end in order to enqueue operations directly. Even if it's exposed to the
 * end-user, some `Pipeline` methods are reserved for custom operations only, and should not manipulated directly.
 *
 * The end-user could bypass ribs front-end and use directly a pipeline to manipulate images but he would have to be
 * aware that the front-end configures the pipeline in a special way that make ribs and it's ecosystem work properly.
 *
 * @constructor
 */
function Pipeline() {
    // shortcut syntax
    if (!(this instanceof Pipeline)) return new Pipeline();
    // queue of async operations to by applied in FIFO order
    this.queue = [];
    // shared params
    this.sharedParams = {};
}

util.inherits(Pipeline, EventEmitter);

/**
 *
 * @param name
 * @param [params]
 * @returns {Pipeline}
 */
Pipeline.prototype.use = function(name, params) {
    // bypass on error
    if (this.error) return this;

    try {
        // `use(bulk, [callback]`

        // call `use` once per bulk and `done` if a `callback` is provided
        if (Array.isArray(name)) {
            name.forEach(function(bulk) {
                // inline operation?
                if ('function' == typeof bulk)
                    this.use(bulk);
                // classic
                else
                    this.use(bulk.operation, bulk.params);
            }.bind(this));

            // a callback being specified is simply a shortcut for `done`
            // we resolve the pipeline directly
            if ('function' == typeof params) return this.done(params);
            return this;
        }

        // `use(name|operation, [params])`

        // check for valid name types
        // this can be a string to a valid registered operation or an inline operation
        utils.checkType('name', name, false, 'string', 'function');

        var operation,
            queue = this.queue;

        // inline operation?
        if ('function' == typeof name)
            operation = name;
        // classic
        else {
            operation = Pipeline.operations[name];
            if ('function' != typeof operation)
                throw new Error('no operation found: ' + name);
        }

        // if no params is specified, we link to the pipeline shared param object
        // this gives the ability to share data between some operations
        // note: might be as powerful as useless, we'll see...
        params = params || this.sharedParams;

        // `from` and `to` must not have duplicates in the queue
        if (~'from|to'.indexOf(name))
            lock(queue, name);

        // configures operation
        var configuredOperation = invokeOperation.bind(this, operation, params);

        // mark it
        mark(configuredOperation, name);

        // `from` is always inserted at the top of the queue
        if ('from' == name)
            queue.unshift(configuredOperation);
        // push the operation to the queue
        else
            queue.push(configuredOperation);
    }
    catch (err) {
        err.args = { name: name, params: params };
        this.error = err;
        return this;
    }

    return this;
};

/**
 *
 * @param {function} [callback]
 */
Pipeline.prototype.done = function(callback) {
    // ensure we defer this
    // this also ensure that we can subscribe to events after calling this
    setImmediate(function() {
        var queue = this.queue;

        // ensure `to` is the last
        ensureLast(queue);

        // `start` event
        this.emit('start');

        // bypass on error
        if (this.error) {
            finalize.call(this, callback);
            return this;
        }

        // unleash the Kraken!
        // hum, if it's too dangerous... take this ascii sword with you!
        //
        //   o()xxxx[{::::::::::::::::::::::::::>
        //
        // Kraken will *waterfall* calls to operations where each operation passes its results to the next one.
        // There is no additional logic to async's default behavior, because there is no obvious need to.
        // However that means that a *buggy* operation could break the chain if it does not call correctly the `next`
        // argument or simply corrupt data. There is a strong coupling between all operations involved.
        // Developers of *custom operation* are in charge of testing them well.
        async.waterfall(queue, finalize.bind(this, callback));
    }.bind(this));

    return this;
};

/**
 *
 */
Pipeline.prototype.clear = function() {
    this.queue.length = 0;
    this.sharedParams = {};
    this.error = undefined;
    return this;
};

/**
 *
 * @return {Stream}
 */
Pipeline.prototype.stream = function(params) {
    return createStream(this, params);
};

/**
 *
 * @param name
 * @param operation
 */
Pipeline.add = function(name, operation) {
    // add the new operation
    Pipeline.operations[name] = operation;

    // proxy it via a new Pipeline method
    Pipeline.prototype[name] = function(params) {
        return this.use(name, params);
    };
};

/**
 * Installs or retrieve a hook for a given operation and filename pattern.
 *
 * @param {string} operation - Operation for which the hook will be installed.
 * This can be one of the following values:
 *  - shrink
 * @param {string} name - Hook name. Each operation can have multiple hooks for different purpose. Depending of the
 * operation, this can be one of the following values:
 *  - constraints
 * @param {function} [hook] - The hook function that will be called. Depending on the operation, its signature may depend.
 * See each default hooks for more details.
 * @return {function|undefined}
 */
Pipeline.hook = function(operation, name, hook) {
    // arguments type
    check('operation', operation, false, 'string');
    check('name', name, false, 'string');
    check('hook', hook, true, 'function');

    var key = operation + ':' + name;

    // registers hook
    if (hook) {
        var hooksPool = (Pipeline.hooks[key] = Pipeline.hooks[key] || []);
        hooksPool.arity = hooksPool.arity || hook.length;
        Pipeline.hooks[key] = hook;
    }
    // retrieve hook
    else
        return Pipeline.hooks[key];
};

/**
 *
 * @type {Object}
 */
Pipeline.operations = {};

/**
 *
 * @type {Object}
 */
Pipeline.hooks = {};

/**
 *
 * @private
 * @param callback
 * @param err
 * @param res
 */
function finalize(callback, err, res) {
    // cache error locally
    err = err || this.error;

    // clear before invoking callback
    this.clear();

    // `error` event
    if (err) {
        // add an empty listener to `error` event.
        // EventEmitter throws an error when there is no listener for this particular event.
        // We don't want that!
        //   http://nodejs.org/api/events.html
        this.on('error', utils.noop);

        this.emit('error', err);
    }
    // `success` event
    else
        this.emit('success', res);

    // `end` event
    this.emit('end', err, res);

    // invoke our precious end-user callback with associated (non)-error and result
    if ('function' == typeof callback) callback(err, res);
}

/**
 *
 * @private
 * @param operation
 * @param params
 * @param image
 * @param callback
 */
function invokeOperation(operation, params, image, callback) {
    // `from` operation has only a callback as argument
    var fromOp = 'function' == typeof image,
        finalParams;

    // `operation:before` event
    this.emit('operation:before', operation.name, params);

    // juggle arguments for `from` operation
    if (fromOp) callback = image;

    // wrap the callback to emit the `after` event
    var wrappedCallback = function() {
        // `operation:after` event
        this.emit('operation:after', operation.name, finalParams);

        // invoke the original callback transparently
        callback.apply(this, arguments);
    }.bind(this);

    // invoke operation
    finalParams = operation.call(this,
        params,
        fromOp ? wrappedCallback : image,
        fromOp ? undefined : wrappedCallback);
}

function lock(queue, name) {
    if (queue['_' + name + 'Lock'])
        throw new Error('duplicate of ' + name + ' found');

    queue['_' + name + 'Lock'] = true;
}

function mark(operation, name) {
    operation._name = name.name || name;
}

function ensureLast(queue) {
    var len = queue.length,
        i = len - 1;

    // seek and d...
    for (; i >= 0 && 'to' != queue[i]._name; i--) ;

    // `to` does not exist or is the last
    if (i < 0 || i == len - 1) return;

    var tmp = queue[len - 1];
    queue[len - 1] = queue[i];
    queue[i] = tmp;
}

/**
 * Register default hooks.
 */
Pipeline.hook('resize', 'constraints', hooks.resizeConstraintsHook);
Pipeline.hook('crop', 'constraints', hooks.cropConstraintsHook);

/**
 * Export.
 */

module.exports = Pipeline;