EkoCommunications/EkoQueue

View on GitHub
lib/job-runner.js

Summary

Maintainability
A
35 mins
Test Coverage
'use strict';

const debug = require('debug')('eko:EkoQueueDispatcher/JobRunner');
const path = require('path');
const co = require('co');

/**
 * Find job class, instantiate and return
 * @param  {string} job
 * @param  {string} jobPath
 * @return {object|boolean}
 */
const instantiate = (job, jobPath) => {
  try {
    const jobModule = path.join(jobPath, `${job}-job`);
    const JobClass = require(jobModule);

    return new JobClass;
  } catch (error) {
    debug(`No job class found for ${job}`);

    return false;
  }
};

/**
 * Run the handle method for job instance. If job fails it
 * will run the failed method on the instance
 * @param  {string}   job
 * @param  {Function} done
 */
const runJob = (instance, job, done) => {
  co(function * () {
    yield instance.handle(...job.data);
    done();
  }).catch(error => {
    co(function * () {
      yield instance.failed(...job.data);
      done(error);

      debug(`Failed processing job ${job.type} with the error ${error.message}`);
    });
  });
};

/**
 * Constructor method for job instance
 * @param  {string} job
 * @param  {Object} jobProvider
 * @return {Object}
 */
module.exports = (job, jobProvider) => {
  let instance;

  /**
   * Check if job instance exists in job provider. If not
   * we check to see if jobPath exists in jobProvider. If it
   * does we then try to instantiate the module.
   */
  if (job in jobProvider) {
    instance = jobProvider[job];
  } else {
    if ('jobPath' in jobProvider) {
      instance = instantiate(job, jobProvider.jobPath);
    }
  }

  /**
   * If no instance exists we continue as normal
   */
  if (!instance) return;

  const priority = instance.priority || 0;
  const concurrency = instance.concurrency || 1;
  const run = (job, done) => runJob(instance, job, done);

  return { run, priority, concurrency };
};