samcday/node-clusterphone

View on GitHub
clusterphone.js

Summary

Maintainability
D
2 days
Test Coverage
"use strict";

var cluster = require("cluster"),
    Promise = require("bluebird"),
    semver  = require("semver"),
    pkg     = require("./package"),
    debug   = require("debug")("clusterphone:" + (cluster.isMaster ? "master" : "worker" + cluster.worker.id));

var clusterphone = require("./nucleus"),
    messageBus = require("./message-bus");

// If we're the first clusterphone to initialise, OR we're a newer version than
// the one that has already initialised, we use our message handler impl.
/* istanbul ignore else */ // <--- there is no else path, istanbul thinks there is. Bug?
if (!clusterphone.version || semver.gt(pkg.version, clusterphone.version)) {
  if (clusterphone.version) {
    debug("Older version (" + clusterphone.version + ") of clusterphone messageBus being replaced by " + pkg.version);
  }
  clusterphone.version = pkg.version;
  clusterphone.messageBus = messageBus;
}

var internalNamespace,
    internalNameGuard = 0;  // We use this to prevent callers from obtaining the "clusterphone" namespace.

// Build the object that we return from sendTo calls.
function constructMessageApi(namespace, cmd, destName, seq) {
  var api = {},
      valid = true,
      timeout = module.exports.ackTimeout,
      resolve,
      reject;

  var promise = new Promise(function(_resolve, _reject) {
    resolve = _resolve;
    reject = _reject;
  });

  setImmediate(function() {
    valid = false;
  });

  api.within = function(newTimeout) {
    if (!valid) {
      throw new Error("within() / acknowledged() calls are only valid immediately after sending a message.");
    }
    if (resolve.monitored) {
      throw new Error("within() must be called *before* acknowledged()");
    }
    if ("number" !== typeof newTimeout) {
      newTimeout = parseInt(newTimeout, 10);
    }
    if(!newTimeout) {
      throw new Error("Timeout must be a number");
    }
    timeout = newTimeout;
    return api;
  };

  api.acknowledged = function(cb) {
    if (!valid) {
      throw new Error("within() / acknowledged() calls are only valid immediately after sending a message.");
    }
    // This flag indicates that the caller does actually care about the resolution of this acknowledgement.
    resolve.monitored = true;
    return promise.timeout(timeout)
        .catch(Promise.TimeoutError, function() {
          // We retrieve the pending here to ensure it's deleted.
          namespace.getPending(seq);
          throw new Error("Timed out waiting for acknowledgement for message " + cmd + " with seq " + seq + " to " + destName + " in namespace " + namespace.interface.name);
        })
        .nodeify(cb);
  };

  api.ackd = api.acknowledged;

  return [api, resolve, reject];
}

// Gets our private data section from a worker object.
function getWorkerData(worker) {
  /* istanbul ignore if */
  if (!worker) {
    throw new TypeError("Trying to get private data for null worker?!");
  }

  var data = worker[clusterphone.workerDataAccessor];
  if (!data) {
    worker[clusterphone.workerDataAccessor] = data = {};
  }

  return data;
}

function namespaced(namespaceId) {
  if (namespaceId === "clusterphone") {
    if (internalNameGuard === 0) {
      throw new Error("The 'clusterphone' namespace name is private. Sorry.");
    }
    internalNameGuard--;
  }

  if (!namespaceId) {
    throw new TypeError("Name is required for namespaced messaging.");
  }

  var namespace = clusterphone.namespaces[namespaceId];
  if (namespace) {
    return namespace.interface;
  }

  debug("Setting up namespace for '" + namespaceId + "'");

  namespace = clusterphone.namespaces[namespaceId] = {
    interface: {}
  };
  namespace.interface.name = namespaceId;

  if (cluster.isMaster) {
    var getWorkerNamespacedData = function(worker) {
      var workerData = getWorkerData(worker);
      var namespacedData = workerData[namespaceId];
      if (!namespacedData) {
        workerData[namespaceId] = namespacedData = {
          seq: 1,
          pending: {},
          queued: [],
          parent: workerData
        };
      }

      return namespacedData;
    };

    namespace.getPending = function(seq) {
      var workerData = getWorkerNamespacedData(this);
      var pending = workerData.pending[seq];
      delete workerData.pending[seq];
      return pending;
    };

    var sendTo = function(worker, cmd, payload, fd, forceSendFD) {
      if (!worker) {
        throw new TypeError("Worker must be specified");
      }

      var workerData = getWorkerNamespacedData(worker);

      if (worker.state === "dead") {
        throw new Error("Worker is dead. Can't send message to it.");
      }

      cmd = cmd ? String(cmd) : cmd;
      if (!cmd) {
        throw new TypeError("Command is required.");
      }

      var canSend = workerData.parent.isReady;

      if (!canSend && fd && forceSendFD !== true) {
        throw new TypeError("You tried to send an FD to a worker that isn't online yet. " +
          "Whilst ordinarily I'd be happy to queue messages for you, deferring sending a descriptor could " +
          "cause strange behavior in your application.");
      }

      var seq = workerData.seq++,
          api = constructMessageApi(namespace, cmd, "worker " + worker.id, seq);

      workerData.pending[seq] = [api[1], api[2]];

      if (canSend) {
        debug("Sending message sequence " + seq + " " + cmd + " to worker " + worker.id);
        worker.send({
          __clusterphone: {
            ns: namespaceId,
            cmd: cmd,
            seq: seq,
            payload: payload
          }
        }, fd);
      } else {
        debug("Queueing message " + cmd + " to worker #" + worker.id);

        workerData.queued.push({
          cmd: cmd,
          seq: seq,
          payload: payload,
          api: api,
          fd: fd
        });
      }

      return api[0];
    };

    namespace.workerReady = function(worker) {
      var data = getWorkerNamespacedData(worker);
      data.parent.isReady = true;

      if (data.queued.length) {
        debug("We have messages in " + namespaceId + " queued for worker " + worker.id + ". Sending them now.");
      }

      while(data.queued.length) {
        var item = data.queued.shift();

        worker.send({
          __clusterphone: {
            ns: namespaceId,
            cmd: item.cmd,
            seq: item.seq,
            payload: item.payload
          }
        }, item.fd);
      }

      worker.emit("clusterphone:online");
    };

    // If a worker dies, fail any monitored ack deferreds.
    var cleanPending = function(worker) {
      var data = getWorkerNamespacedData(worker);

      Object.keys(data.pending).forEach(function(seqNum) {
        var item = data.pending[seqNum];
        delete data.pending[seqNum];
        if (item[0].monitored) {
          item[1](new Error("Undeliverable message: worker died before we could get acknowledgement"));
        }
      });
    };

    cluster.on("disconnect", cleanPending);

    namespace.interface.handlers = {};
    namespace.interface.sendTo = sendTo;
  } else {
    var pendings = {},
        seqCounter = 1;

    namespace.getPending = function(seq) {
      var pending = pendings[seq];
      delete pendings[seq];
      return pending;
    };

    var sendToMaster = function(cmd, payload, fd) {
      var seq = seqCounter++,
          api = constructMessageApi(namespace, cmd, "master", seq);

      cmd = cmd ? String(cmd) : cmd;
      if (!cmd) {
        throw new TypeError("Command is required.");
      }

      debug("Sending message sequence " + seq + " " + cmd + " to master.");

      pendings[seq] = [api[1], api[2]];

      try {
        process.send({
          __clusterphone: {
            ns: namespaceId,
            seq: seq,
            cmd: cmd,
            payload: payload
          }
        }, fd);
      } catch(e) {
        debug("Failed to send " + cmd + " in NS " + namespaceId + " to master.");
        debug(e);
        // We don't want to throw this synchronously. That's mean.
        // Instead we'll wait a tick and see if they registered an ack handler
        // for it.
        setImmediate(function() {
          /* istanbul ignore if */
          if (api[1].monitored) {
            api[2](e);
          }
        });
      }

      return api[0];
    };

    // If this ISN'T the internal namespace, we send a message to master
    // indicating that we are now ready for any queued messages for this ns.
    if (namespaceId !== "clusterphone") {
      internalNamespace.sendToMaster("workerReady", {
        targetNs: namespaceId
      });
    }

    namespace.interface.handlers = {};
    namespace.interface.sendToMaster = sendToMaster;
  }

  return namespace.interface;
}

internalNameGuard++;
internalNamespace = namespaced("clusterphone");

var defaultNamespace = namespaced("_");

if (cluster.isMaster) {
  internalNamespace.handlers.workerReady = function(worker, msg) {
    var targetNsName = msg.targetNs;

    /* istanbul ignore if */
    if (!targetNsName) {
      return;
    }

    var targetNs = clusterphone.namespaces[msg.targetNs];

    if (!targetNs) {
      debug("WEIRD: worker said it was ready for " + targetNsName + ", but we don't have that namespace.");
      return;
    }

    debug("Worker #" + worker.id + " is ready to receive messages for " + targetNsName);
    targetNs.workerReady(worker);
  };
}

module.exports = defaultNamespace;
module.exports.ns = namespaced;
module.exports.ackTimeout = 10 * 1000;  // 10 seconds by default.