mjackson/firework

View on GitHub
Runner.js

Summary

Maintainability
A
3 hrs
Test Coverage
var d = require('describe-property');
var isFunction = require('./utils/isFunction');
var Worker = require('./Worker');

/**
 * A runner is responsible for running many workers and restarting them when
 * they emit errors. The only argument here should be a function that knows how
 * to create new workers.
 */
function Runner(createWorker) {
  if (!isFunction(createWorker))
    throw new Error('Runner#createWorker must be a function');

  this._createWorker = createWorker;
  this.workerID = 0;
  this.workers = [];
}

Object.defineProperties(Runner.prototype, {

  /**
   * Sets the number of workers for this runner and calls the given callback
   * when the operation is complete. When removing workers, the callback receives
   * an array of the workers being removed after they have all finished up their
   * current job. When adding workers, the callback receives an array of the
   * new workers.
   */
  setNumberOfWorkers: d(function (numWorkers, callback) {
    numWorkers = Math.max(0, numWorkers);

    var workers = this.workers;
    var change = numWorkers - workers.length;

    if (change < 0) {
      // Remove oldest workers first.
      var removedWorkers = workers.splice(0, Math.abs(change));
      var numStoppedWorkers = 0;

      removedWorkers.forEach(function (worker) {
        worker.stop(function () {
          if (++numStoppedWorkers === removedWorkers.length && isFunction(callback))
            callback(removedWorkers);
        });
      });

      return;
    }

    var newWorkers = [], worker;
    for (var i = 0; i < change; ++i) {
      newWorkers.push(worker = this.createWorker(++this.workerID));
      worker.start();
    }

    workers.push.apply(workers, newWorkers);

    if (isFunction(callback))
      callback(newWorkers);
  }),

  /**
   * Adds the given number of workers to this runner.
   */
  incrementWorkers: d(function (howMany, callback) {
    if (isFunction(howMany)) {
      callback = howMany;
      howMany = 1;
    }

    this.setNumberOfWorkers(this.workers.length + howMany, callback);
  }),

  /**
   * Removes the given number of workers from this runner.
   */
  decrementWorkers: d(function (howMany, callback) {
    if (isFunction(howMany)) {
      callback = howMany;
      howMany = 1;
    }

    this.setNumberOfWorkers(this.workers.length - howMany, callback);
  }),

  /**
   * Alias for setNumberOfWorkers(0).
   */
  stopAllWorkers: d(function (callback) {
    this.setNumberOfWorkers(0, callback);
  }),

  /**
   * Creates a new worker that is bound to this runner.
   */
  createWorker: d(function (id) {
    var worker = this._createWorker(id);

    if (!(worker instanceof Worker))
      throw new Error('Runner#createWorker must return a Worker');

    worker.on('error', this.replaceWorker.bind(this, worker));

    worker.on('start', function (job) {
      console.log('worker ' + id + ' started job ' + job._key);
    });

    worker.on('failure', function (job, error) {
      console.log('job ' + job._key + ' failed: ' + error.toString());
    });

    worker.on('idle', function () {
      console.log('worker ' + id + ' is idle');
    });

    return worker;
  }),

  /**
   * Replaces the given worker with a new one.
   */
  replaceWorker: d(function (worker) {
    var workers = this.workers;
    var index = workers.indexOf(worker);

    if (index === -1)
      return;

    // Remove this worker and add a new one. When a worker emits
    // "error" it immediately stops working so no need to stop it.
    workers.splice(index, 1);
    this.incrementWorkers(1);
  }),

  /**
   * Returns a string representation of this Runner.
   */
  toString: d(function () {
    return '<Runner:' + this.workers.length + ' workers>';
  })

});

module.exports = Runner;