mjackson/firework

View on GitHub
Worker.js

Summary

Maintainability
A
1 hr
Test Coverage
var d = require('describe-property');
var EventEmitter = require('events').EventEmitter;
var isFunction = require('./utils/isFunction');
var Queue = require('./Queue');

/**
 * A worker is responsible for performing jobs, one at a time, as quickly as it can
 * pull them off a given queue. When it gets a new job, it calls the performJob
 * function that is given in the constructor. This function is called with the current
 * job and a node-style callback argument.
 *
 * Workers keep track of how many jobs they have done in the `failureCount` and
 * `successCount` instance variables.
 *
 * Workers emit the following events:
 *
 *   - error          Called when there is a network/server error.
 *   - start          Receives a "job" argument immediately before a new job
 *                    is about to be performed.
 *   - failure        Receives "job" and "error" arguments when a job fails
 *                    for some reason.
 *   - success        Receives a "job" argument when a job finishes successfully.
 *   - finish         Receives a "job" argument when a job finishes, regardless
 *                    of success or failure.
 *   - idle           Called when this worker stops working.
 */
function Worker(queue, performJob) {
  EventEmitter.call(this);

  if (!isFunction(performJob))
    throw new Error('Worker#performJob must be a function');

  this.performJob = performJob;

  if (!(queue instanceof Queue))
    queue = new Queue(queue);

  this.queue = queue;
  this.failureCount = 0;
  this.successCount = 0;
  this.isBusy = false;

  queue.setupWorker(this);
}

Worker.prototype = Object.create(EventEmitter.prototype, {

  constructor: d(Worker),

  /**
   * Starts this worker.
   */
  start: d(function () {
    if (this._query)
      return;

    var query = this.queue.createQuery();

    // The "child_added" event sometimes gets fired more often than it should. It
    // seems the Firebase client errs on the side of emitting extraneous events,
    // which isn't necessarily bad. We just need to know that.
    query.on('child_added', function (snapshot) {
      this._nextSnapshot = snapshot;
      this._tryToWork();
    }, this);

    this._query = query;
  }),

  /**
   * Stops this worker and calls the given callback after finishing
   * any work that is currently in progress.
   */
  stop: d(function (callback) {
    if (this._query) {
      this._query.off();
      this._query = null;
    }

    if (!isFunction(callback))
      return;

    if (this.isBusy) {
      this.once('idle', callback);
    } else {
      callback();
    }
  }),

  /**
   * Starts working on the given job.
   */
  startJob: d(function (job) {
    this.emit('start', job);

    // Perform the job. Guard against misbehaving performJob
    // functions that call the callback more than once.
    var alreadyCalled = false;
    var self = this;
    function finishJob(error) {
      if (alreadyCalled) {
        console.error('Error: The callback given to performJob was called more than once!');
      } else {
        alreadyCalled = true;
        self.finishJob(job, error);
      }
    }

    try {
      this.performJob(job, finishJob);
    } catch (error) {
      finishJob(error);
    }
  }),

  /**
   * Finishes the given job.
   */
  finishJob: d(function (job, error) {
    if (error) {
      this.failureCount += 1;
      this.emit('failure', job, error);
    } else {
      this.successCount += 1;
      this.emit('success', job);
    }

    this.emit('finish', job);

    this._getNextJob(job);
  }),

  _getNextJob: d(function (previousJob) {
    this.isBusy = false;

    if (this._nextSnapshot) {
      this._tryToWork(previousJob);
    } else if (previousJob) {
      this.emit('idle');
    }
  }),

  _tryToWork: d(function (previousJob) {
    if (!this._nextSnapshot || this.isBusy)
      return;

    this.isBusy = true;

    var ref = this._nextSnapshot.ref();
    this._nextSnapshot = null;

    var nextJob;
    function claimJob(job) {
      nextJob = job;

      // Remove this job from the queue.
      return null;
    }

    var self = this;
    function onComplete(error, committed) {
      if (error) {
        // We may be in a bad state here. Notify listeners and stop working.
        self.emit('error', error);
      } else if (committed && nextJob) {
        // Ensure the job has a key.
        if (!nextJob._key)
          nextJob._key = ref.key();

        // We successfully claimed a job. Start working on it.
        self.startJob(nextJob);
      } else {
        // Another worker claimed the job. Get the next one.
        self._getNextJob(previousJob);
      }
    }

    ref.transaction(claimJob, onComplete, false);
  }),

  /**
   * Returns a string representation of this Worker.
   */
  toString: d(function () {
    return '<Worker:' + this.queue + '>';
  })

});

module.exports = Worker;