asa-git/forked-worker-pool

View on GitHub
lib/index.js

Summary

Maintainability
B
7 hrs
Test Coverage
/**
 * Main module to create a pool of Forked Workers.
 */

//TODO: replace the array Pool._jobs.prending with a queue for performance.
//TODO: see how to implement an optional timout for the started state of workers.

var util = require('util');
var fs = require('fs');
var path = require('path');
var EventEmitter = require('events').EventEmitter;
var _ForkedWorker = require('./_ForkedWorker');


function isInteger(n) {
    return (n === +n && n === (n|0));
}
function isStrictObject(obj) {
    return (typeof obj === 'object' && obj !== null &&  !Array.isArray(obj));
}
function isNonEmptyString(str) {
    return (typeof str === 'string' && str.trim().length>0);
}
function fileExists(pathname) {
    try {
        return fs.statSync(/\.js$/i.test(pathname) ? pathname : pathname + '.js').isFile();
    } catch(err) {
        return false;
    }
}


/**
 * Utility method to filter and validate the configuration of the pool
 */
function filterConfig(config) {

    // check the required parameters
    if (!isStrictObject(config))                    { throw new TypeError('Expecting a valid config object'); }
    if (!isNonEmptyString(config.path))                { throw new Error('Expecting a path for the workers module'); }
    if (!path.isAbsolute(config.path))                { throw new Error('Expecting an absolute path for the workers module'); }
    if (!fileExists(config.path))                    { throw new Error('Expecting an existing file for the workers module'); }
    if (!isInteger(config.size) || config.size<=0)    { throw new Error('Expecting an integer>0 for the number of workers to be associated with this pool'); }

    // check the optional parameters
    if ('autoStart' in config && typeof config.autoStart!=='boolean')    { throw new TypeError('Expecting a boolean for the autoStart parameter'); }
    if ('silent' in config && typeof config.silent!=='boolean')            { throw new TypeError('Expecting a boolean for the silent parameter'); }

    if ('coverage' in config) {
        if (!isStrictObject(config.coverage))            { throw new TypeError('Expecting a valid coverage object'); }
        if (!isNonEmptyString(config.coverage.path))    { throw new Error('Expecting a valid path for the coverage module'); }
        if (config.coverage.args && !Array.isArray(config.coverage.args)) {
            throw new TypeError('Expecting an array for the coverage args parameter');
        }
    }

    return {
        fork: {
            path: (config.coverage ? config.coverage.path : config.path),
            args: (config.coverage ? (config.coverage.args || []).concat(config.path) : []),
            options: { silent: ('silent' in config ? config.silent : true) }
        },
        size: config.size,
        autoStart: (config.autoStart || false)
    };
}

/**
 * Constructor of the pool.
 */
function Pool(config) {
    this._conf = filterConfig(config);
    this._workers = {    instances: [],
                        idle: [],
                        busy: [] };

    this._jobs = {        processedCount: 0,
                        failedCount: 0,
                        assigned: [],
                        pending: [] };

    if (this._conf.autoStart) {
        this.start();
    }
}
util.inherits(Pool, EventEmitter);

function removeFromArray(array, valueOrFunction) {
    var index = -1;
    if (typeof valueOrFunction === 'function') {
        array.some( function(value, idx) { if (valueOrFunction(value)) { index = idx; return true; } });
    } else {
        index = array.indexOf(valueOrFunction);
    }
    return (index<0 ? null : array.splice(index, 1)[0]);
}
Pool.prototype._hasWorker = function(worker) {
    return (this._workers.instances.indexOf(worker)>=0);
};
Pool.prototype._removeWorker = function(worker) {
    // Remove the reference of the worker
    removeFromArray(this._workers.instances, worker);
    removeFromArray(this._workers.idle, worker);
    removeFromArray(this._workers.busy, worker);
    
    // make sure assign job to this worker is placed back in the queue
    var job = removeFromArray(this._jobs.assigned, function(job) { return (job.worker===worker); });
    if (job) {
        job.worker = null;
        this._jobs.pending.push(job);
    }
};
Pool.prototype._dispatchJobs = function() {
    while (this._workers.idle.length>0 && this._jobs.pending.length>0) {
        var worker = this._workers.idle.shift();
        var job = this._jobs.pending.shift();
        job.worker = worker;
        this._workers.busy.push(worker);
        this._jobs.assigned.push(job);
        worker.send(job.data);
    }
};

Pool.prototype.start = function() {
    var self = this;
    function removeJob(worker, wasSuccessfull) {
        var job = removeFromArray(self._jobs.assigned, function(job) { return (job.worker===worker); });
        removeFromArray(self._workers.busy, worker);
        if (self._hasWorker(worker)) {
            self._workers.idle.push(worker);
            if (wasSuccessfull) {
                self._jobs.processedCount++;
            } else {
                self._jobs.failedCount++;
            }
            self._dispatchJobs();
        }
        return job;
    }
    function createWorker(config) {
        return new _ForkedWorker(config)
                .on('started', function(worker) {
                    if (self._hasWorker(worker)) {
                        self._workers.idle.push(worker);
                        self.emit('started', self);
                        self._dispatchJobs();
                    }
                })
                .on('data', function(worker, data) {
                    var job = removeJob(worker, true);
                    if (job.callbacks.pass) {
                        job.callbacks.pass(self, job.data, data);
                    } else {
                        self.emit('data', self, job.data, data);
                    }
                })
                .on('error', function(worker, err) {
                    var job = removeJob(worker, false);
                    function requeue(data) {
                        self.send(data || job.data, job.callbacks.pass, job.callbacks.fail);
                    }
                    if (job.callbacks.fail) {
                        job.callbacks.fail(self, err, job.data, requeue);
                    } else {
                        self.emit('error', self, err, job.data, requeue);
                    }

                })
                .on('disconnected', function(worker) {
                    self._removeWorker(worker);
                    self.emit('disconnected', self);
                })
                .on('exit', function(worker) {
                    self._removeWorker(worker);
                    self.emit('exit', self);
                });
    }
    while(this._workers.instances.length<this._conf.size) {
        this._workers.instances.push(createWorker(this._conf.fork));
    }
    return this;
};

Pool.prototype.releaseIdle = function(count) {
    var nbItems = (    isInteger(count) ?
                    Math.min(this._workers.idle.length, Math.max(count,0)) : 
                    this._workers.idle.length );

    this._workers.idle.splice(0, nbItems).forEach(function(worker) { worker.disconnect(); });
    return nbItems;
};

Pool.prototype.releaseAll = function() {
    this._workers.instances.forEach(function(worker) { worker.disconnect(); });
    this._workers.instances = [];
};

Pool.prototype.send = function(data, passCallback, failCallback) {
    this._jobs.pending.push({ data: data, callbacks: {pass: passCallback, fail: failCallback}});
    this._dispatchJobs();
    return this;
};

Pool.prototype.getStatus = function() {
    return {
        workers: {
            created: this._workers.instances.length,
            idle: this._workers.idle.length,
            busy: this._workers.busy.length
        },
        jobs: {
            processed: this._jobs.processedCount,
            failed: this._jobs.failedCount,
            assigned: this._jobs.assigned.length,
            pending: this._jobs.pending.length
        }
    };
};

var ForkedWorker = require('./ForkedWorker.js');

module.exports = { Pool: Pool, ForkedWorker: ForkedWorker };