doublesharp/lru-cache-for-clusters-as-promised

View on GitHub
lib/master.js

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
const cluster = require('cluster');
const Debug = require('debug');
const LRUCache = require('lru-cache');

const utils = require('./utils');
const config = require('../config');

const masterMessages = require('./master-messages');

const debug = new Debug(`${config.source}-master`);
const messages = new Debug(`${config.source}-messages`);

// lru caches by namespace on the master
const caches = {};

// this code will only run on the master to set up handles for messages from the workers
if (cluster.isMaster) {
  // for each worker created...
  cluster.on('fork', (worker) => {
    // wait for the worker to send a message
    worker.on('message', (request) => {
      if (request.source !== config.source) return;
      messages(`Master received message from worker ${worker.id}`, request);
      return masterMessages.getMessageHandler(request.func)(
        caches,
        request,
        worker
      );
    });
  });
}

function getLruCache(caches, namespace, options) {
  let lru = caches[namespace];
  if (!lru || lru instanceof LRUCache === false) {
    lru = caches[namespace] = new LRUCache(options);
    debug(`Created new LRUCache for namespace '${namespace}'`);
  }

  utils.handlePruneCronJob(lru, options.prune, namespace);

  return lru;
}

const getOrSetConfigValue = async ({
  caches,
  namespace,
  options,
  func: property,
  funcArgs: value,
}) => {
  const lru = getLruCache(caches, namespace, options);
  if (value[0]) {
    lru[property] = value[0];
  }
  return lru[property];
};

const incrementOrDecrement = async ({
  caches,
  namespace,
  options,
  func,
  funcArgs,
}) => {
  const lru = getLruCache(caches, namespace, options);
  // get the current value default to 0
  let value = lru.get(funcArgs[0]);
  // maybe initialize and increment
  value =
    (typeof value === 'number' ? value : 0) +
    (funcArgs[1] || 1) * (func === 'decr' ? -1 : 1);
  // set the new value
  lru.set(funcArgs[0], value);
  return value;
};

const getMultipleValues = async (options) =>
  handleMultipleValues('mGet', options);
const setMultipleValues = async (options) =>
  handleMultipleValues('mSet', options);
const deleteMultipleValues = async (options) =>
  handleMultipleValues('mDel', options);

const handleMultipleValues = async (func, { namespace, options, funcArgs }) => {
  const lru = getLruCache(caches, namespace, options);
  return utils[func](lru, funcArgs);
};

const defaultLruFunction = async ({
  caches,
  namespace,
  options,
  func,
  funcArgs,
}) => {
  const lru = getLruCache(caches, namespace, options);
  if (typeof lru[func] !== 'function') {
    throw new Error(`LRUCache.${func}() is not a valid function`);
  }
  // just call the function on the lru-cache
  return lru[func](...funcArgs);
};

const promiseHandlerFunctions = {
  mGet: getMultipleValues,
  mSet: setMultipleValues,
  mDel: deleteMultipleValues,
  decr: incrementOrDecrement,
  incr: incrementOrDecrement,
  max: getOrSetConfigValue,
  maxAge: getOrSetConfigValue,
  allowStale: getOrSetConfigValue,
  itemCount: getOrSetConfigValue,
  length: getOrSetConfigValue,
};

const getPromiseHandler = (func) => {
  const handler = promiseHandlerFunctions[func];
  return handler ? handler : defaultLruFunction;
};

// return a promise that resolves to the result of the method on
// the local lru-cache this is the master thread, or from the
// lru-cache on the master thread if this is a worker
module.exports = {
  caches,
  getPromisified: ({ namespace }, options) => {
    // create the new LRU cache
    const cache = getLruCache(caches, namespace, options);
    utils.setCacheProperties(cache, options);
    // return function to promisify function calls
    return async (...args) => {
      // acting on the local lru-cache
      messages(namespace, args);
      // first argument is the function to run
      const func = args[0];
      // the rest of the args are the function arguments of N length
      const funcArgs = Array.prototype.slice.call(args, 1, args.length);
      // this returns an async function to handle the function call
      return getPromiseHandler(func)({
        caches,
        namespace,
        options,
        func,
        funcArgs,
      });
    };
  },
};