lib/dispatch/index.js

Summary

Maintainability
F
6 days
Test Coverage
/**
 *  @title joola
 *  @overview the open-source data analytics framework
 *  @copyright Joola Smart Solutions, Ltd. <info@joo.la>
 *  @license GPL-3.0+ <http://spdx.org/licenses/GPL-3.0+>
 *
 *  Licensed under GNU General Public License 3.0 or later.
 *  Some rights reserved. See LICENSE, AUTHORS.
 **/

'use strict';

var
  joola = require('../joola'),

  events = require('events'),
  domain = require('domain'),
  url = require('url'),
  fs = require('fs'),
  path = require('path'),
  redis = require('redis'),
  ce = require('cloneextend'),

  Stomp = require('stomp'),

  _ = require('underscore');

var dispatch = module.exports;

dispatch.init = function (callback) {
  try {
    var self = dispatch;

    self.namespace = 'dispatch';

    self.subscribed = false;
    self.subscriptions = [];
    self.listeners = [];
    self.tracers = [];
    self.initialized = false;
    self.events = new events.EventEmitter();
    self.enabled = joola.config.get('dispatch:enabled');
    /* istanbul ignore if */
    if (typeof self.enabled === 'undefined')
      self.enabled = true;

    var redisConfig = joola.config.get('store:dispatch:redis');
    if (self.enabled && redisConfig && redisConfig.dsn) {
      var redisdsn = redisConfig.dsn;
      var parsed_url = url.parse(redisdsn);
      var parsed_auth = (parsed_url.auth || '').split(':');

      self.host = parsed_url.host.split(':')[0];
      self.port = parsed_url.port || 6379;
      self.db = parsed_auth[0];
      self.auth = parsed_auth[1];

      self.redis = redis.createClient(self.port, self.host);
      self.publisher = redis.createClient(self.port, self.host);
      self.subscriber = redis.createClient(self.port, self.host);

      self.redis.select(self.db);
      self.publisher.select(self.db);
      self.subscriber.select(self.db);

      /* istanbul ignore if */
      if (self.auth) {
        self.redis.auth(self.auth);
        self.publisher.auth(self.auth);
        self.subscriber.auth(self.auth);
      }

      self.publisher.on('connect', function () {
        joola.logger.debug('[publisher] Connected to redis @ ' + self.host + ':' + self.port + '#' + self.db);
        joola.state.set('publisher', 'working', 'redis [publisher] is up.');
      });
      // Suppress errors from the Redis client
      /* istanbul ignore next */
      self.publisher.on('error', function (err) {
        joola.state.set('publisher', 'failure', 'redis [publisher] is down: ' + (typeof(err) === 'object' ? err.message : err));
      });
      self.subscriber.on('connect', function () {
        joola.logger.debug('[subscriber] Connected to redis @ ' + self.host + ':' + self.port + '#' + self.db);
        joola.state.set('subscriber', 'working', 'redis [subscriber] is up.');
      });
      /* istanbul ignore next */
      self.subscriber.on('error', function (err) {
        joola.state.set('subscriber', 'failure', 'redis [subscriber] is down: ' + (typeof(err) === 'object' ? err.message : err));
      });
      /* istanbul ignore next */
      self.redis.on('error', function (err) {
        joola.state.set('publisher', 'failure', 'redis [publisher] is down: ' + (typeof(err) === 'object' ? err.message : err));
        joola.state.set('subscriber', 'failure', 'redis [subscriber] is down: ' + (typeof(err) === 'object' ? err.message : err));
      });
    }
    else {
      self.enabled = false;
      self.subscriber = new events.EventEmitter();
      self.publisher = new events.EventEmitter();
    }
    dispatch.subscriber.on('message', function (channel, message) {
      var listeners;
      listeners = _.filter(dispatch.listeners, function (l) {
        return l._dispatch.message == channel.replace('-request', '').replace('-done', '');
      });
      listeners.forEach(function (listener) {
        joola.common.parse(message, function (err, message) {
          return process.nextTick(function () {
            return listener._dispatch.cb.call(listener, channel, message);
          });
        });
      });
    });

    var stompSettings = {
      debug: false
    };
    var stompConfig = joola.config.get('store:dispatch:stomp');
    if (self.enabled && stompConfig && stompConfig.dsn) {
      var stompdsn = stompConfig.dsn;
      var parts = url.parse(stompdsn);
      stompSettings.host = parts.host.split(':')[0];
      stompSettings.port = parseInt(parts.port || 61613, 10);
      stompSettings.user = parts.auth.split(':')[0];
      stompSettings.password = parts.auth.split(':')[1];
      stompSettings.vhost = (parts.path ? parts.path.replace('/', '') : null);

      dispatch.listen = function (client, destination, selector, messageProcessorFunction) {
        var subscribeArgs = {
          destination: destination,
          //selector: 'RoutingKey LIKE '%s',
          ack: 'auto'
        };

        client.subscribe(subscribeArgs, messageProcessorFunction);
      };

      dispatch.publishMessage = function (client, message, queueName, routingKey, replyTo) {
        var publisherArgs = {
          body: JSON.stringify(message),
          persistent: 'true'
        };

        if (replyTo)
          publisherArgs['reply-to'] = '/temp-queue/' + queueName.replace('/queue/', '') + '.done';

        if (routingKey) {
          publisherArgs.RoutingKey = routingKey;
        }
        publisherArgs.destination = queueName;

        var wantReceipt = false;
        client.send(publisherArgs, wantReceipt);
      };

      var reconnectTimer;

      var stompReconnectInterval = 1000;
      var stompConnect = function () {
        var stompArgs = {
          host: stompSettings.host,
          port: stompSettings.port,
          login: stompSettings.user,
          passcode: stompSettings.password,
          vhost: stompSettings.vhost,
          debug: stompSettings.debug,
          timeout: 0
        };

        try {
          joola.logger.debug('STOMP connecting to ' + stompSettings.host + ':' + stompSettings.port + '@' + stompSettings.vhost);
          dispatch.stompClient = new Stomp().Client(stompArgs);
          var connected = false;
          dispatch.stompClient.on('connected', function () {
            self.initialized = true;
            /* istanbul ignore if */
            if (reconnectTimer) {
              clearInterval(reconnectTimer);
              reconnectTimer = null;
            }
            joola.logger.debug('STOMP connected @ ' + stompSettings.host + ':' + stompSettings.port);
            dispatch.hook();
            joola.state.set('dispatch', 'working', 'dispatch is up.');
            /* istanbul ignore if */
            if (!connected) {
              connected = true;
              joola.events.emit('dispatch:ready');
              return process.nextTick(function () {
                return callback(null);
              });
            }
          });
        }
        catch (ex) {
          /* istanbul ignore next */
          joola.state.set('dispatch', 'failure', 'dispatch is down: ' + ex);
        }

        /* istanbul ignore next */
        dispatch.stompClient.on('disconnected', function (reason) {
          joola.logger.debug('STOMP disconnect, ' + (reason || 'no reason specified.'));
          if (!reconnectTimer) {
            reconnectTimer = setInterval(function () {
              joola.logger.debug('Attempting reconnect to STOMP');
              dispatch.stompClient.connect();
            }, stompReconnectInterval);
            stompReconnectInterval = stompReconnectInterval * 2;
          }
        });

        /*
         dispatch.stompClient.on('receipt', function (receipt) {

         });
         */

        /* istanbul ignore next */
        dispatch.stompClient.on('closed', function (reason) {
          joola.logger.debug('STOMP closed: ' + reason);
          //if STOMP fails on initialization this is where it's caught.
          if (!self.initialized)
            return callback(new Error('STOMP Closed: ' + reason));
        });

        /* istanbul ignore next */
        dispatch.stompClient.on('error', function (errorFrame) {
          if (reconnectTimer) {
            joola.logger.error('STOMP error: ' + errorFrame);
            joola.state.set('dispatch', 'failure', 'dispatch is down: ' + errorFrame);
            try {
              dispatch.stompClient.disconnect();
            }
            catch (ex) {
              //ignore
            }
          }
        });

        /* istanbul ignore next */
        dispatch.stompClient.on('SIGINT', function () {
          joola.logger.error('STOMP SIGINT');
          joola.state.set('dispatch', 'failure', 'dispatch is down: SIGINT');
          try {
            dispatch.stompClient.disconnect();
          }
          catch (ex) {
            //ignore
          }
        });

        dispatch.stompClient.connect();
      };
      stompConnect();
    }
    else {
      self.enabled = false;
      dispatch.hook();
      joola.state.set('dispatch', 'working', 'dispatch is up.');
      joola.events.emit('dispatch:ready');
      return process.nextTick(function () {
        return callback(null);
      });
    }
  }
  catch (ex) {
    /* istanbul ignore next */
    console.log(ex);
    /* istanbul ignore next */
    return process.nextTick(function () {
      return callback(ex);
    });
  }
};

dispatch.hook = function () {
  fs.readdirSync(path.join(__dirname, './')).forEach(function (file) {
    if (file != 'index.js' && file != 'index2.js' && file != 'prototypes') {
      try {
        var module = require('./' + file);
        var modulename = file.replace('.js', '');
        dispatch[modulename] = {};
        if (!joola[modulename])
          joola[modulename] = {};
        Object.keys(module).forEach(function (fn) {
          var _fn = module[fn];
          dispatch.setup(_fn, function (err) {
            if (err) {
              //do something
            }

            dispatch[modulename][fn] = _fn.run;
            if (!joola[modulename][fn])
              joola[modulename][fn] = _fn.run;
          });
        });
      } catch (ex) {
        /* istanbul ignore next */
        console.log('Failure in loading dispatch module [' + file + ']: ' + ex);
        /* istanbul ignore next */
        console.log(ex.stack);
      }
    }
  });
};

dispatch.setup = function (fn, callback) {
  var details = fn._dispatch;

  if (!details)
    return process.nextTick(function () {
      return callback(new Error('Failed to locate _dispatch'));
    });

  var listener = _.find(dispatch.listeners, function (listener) {
    return listener._dispatch.message == details.message;
  });
  if (listener) {
    //joola.logger.trace('[dispatch] Channel [' + details.message + '] already registered.');
  }
  else {
    fn._dispatch = fn._dispatch || {};
    fn._dispatch.criteria = fn._dispatch.criteria || 'notme';
    fn._dispatch.limit = fn._dispatch.limit || 1;

    dispatch.listeners.push(fn);
    if (!details.message)
      return callback(null);
  }
  if (dispatch.enabled && details.message) {
    dispatch.listen(dispatch.stompClient, '/queue/joola.dispatch.' + details.message.replace(':', '.'), "", function name(message, headers) {
      dispatch.processRequest(message, headers);
    });
    dispatch.listen(dispatch.stompClient, '/temp-queue/joola.dispatch.' + details.message.replace(':', '.') + '.done', "", function name(message, headers) {
      dispatch.processResponse(message, headers);
    });
  }

  return process.nextTick(function () {
    return callback(null);
  });
};
var first = false;

dispatch.processRequest = function (message, headers) {
  message = message[0];
  message = JSON.parse(message).message;

  var _result;
  joola.auth.getUserByToken(message.token, function (err, user) {
    /* istanbul ignore if */
    if (err) {
      dispatch.fulfill(message, {
        err: err,
        message: null
      }, headers);
    }
    var context = {};
    context.user = user;

    var payload = message.payload;
    var _params = [];
    _params.push(context);

    /* istanbul ignore else */
    if (payload && typeof payload === 'object') {
      Object.keys(payload).forEach(function (key) {
        _params.push(payload[key]);
      });
    }
    else if (message.payload && message.payload != '{}') {
      _params.push(message.payload);
    }
    _params.push(function (err, result) {
      delete arguments[0];
      _result = {
        err: err,
        message: arguments
      };

      dispatch.fulfill(message, _result, headers);
    });

    try {
      var listeners = _.filter(dispatch.listeners, function (l) {
        return l._dispatch.message === headers.destination.replace('/queue/joola.dispatch.', '').replace('.', ':');
      });
      listeners.forEach(function (listener) {
        var d = domain.create();
        /* istanbul ignore next */
        d.on('error', function (err) {
          joola.logger.warn(err, 'Failed to process dispatch request: ' + err);
          _result = {
            err: err,
            message: null
          };
          dispatch.fulfill(message, _result, headers);
        });
        d.run(function () {
          listener.run.apply(message, _params);
        });
      });
    }
    catch (ex) {
      /* istanbul ignore next */
      dispatch.fulfill(message, {
        err: ex,
        message: null
      }, headers);
    }
  });
};

dispatch.processResponse = function (message, headers) {
  message = message[0];
  message = JSON.parse(message);
  var listeners = _.filter(dispatch.listeners, function (l) {
    return l._dispatch.message === headers.subscription.replace('/temp-queue/joola.dispatch.', '').replace('.', ':').replace('.done', '');
  });
  listeners.forEach(function (listener) {
    if (typeof listener[message.id] === 'function') {
      var result = message.result;
      var args = [];
      args.push(result.err);
      /* istanbul ignore else */
      if (joola.common.typeof(result.message) === 'object') {
        Object.keys(result.message).forEach(function (key) {
          args.push(result.message[key]);
        });
      }
      else if (joola.common.typeof(result.err) === 'object') {
        joola.logger.debug(result.err, 'Failed to process dispatch response: ' + result.err);
        args.push(result.err);
      }
      else {
        console.log('Weird result', message);
      }

      delete message.result;
      delete message.payload;
      args.push(message);
      args.push(headers);

      if (listener[message.id]) {
        var d = domain.create();
        /* istanbul ignore next */
        d.on('error', function (err) {
          joola.logger.warn(err, 'Failed to process dispatch response: ' + err);
          //TODO: The assumption here is that the same function that was called and failed will handle the error properly. this is a risky assumption, but it allows for proper error messages to be handled.
          try {
            listener[message.id].apply(listener, [err]);
          }
          catch (ex) {
            //there's nothing we can do at this point. the request will timeout and die.
          }
        });
        d.run(function () {
          listener[message.id].apply(listener, args);
          delete listener[message.id];
        });
      }
    }
  });
};

dispatch.request = function (token, channel, params, callback) {
  var self = dispatch;
  var compiled = {
    id: joola.common.uuid(),
    timestamp: new Date().getTime(),
    'timestamp-nice': new Date(),
    from: joola.UID,
    token: token,
    to: 'any',
    picked: 0,
    fulfilled: 0,
    'fulfilled-by': null,
    'fulfilled-timestamp': null,
    'fulfilled-timestamp-nice': null,
    channel: channel
  };

  var listener = _.find(dispatch.listeners, function (l) {
    return (l._dispatch.message == channel);
  });

  /* istanbul ignore if */
  if (!listener)
    return process.nextTick(function () {
      return callback(new Error('Failed to locate listener for message [' + compiled.id + ']'));
    });

  compiled.payload = params;
  joola.logger.trace('Compiled request [' + compiled.id + '] for channel [' + channel + ']');
  if (typeof callback === 'function') {
    listener[compiled.id] = callback;

    var message = {
      message: compiled,
      queue: "/queue/joola.dispatch." + compiled.channel.replace(':', '.'),
      msgId: compiled.id
    };
    dispatch.publishMessage(dispatch.stompClient, message, '/queue/joola.dispatch.' + compiled.channel.replace(':', '.'), null, true);
  }
};

dispatch.fulfill = function (message, result, headers, callback) {
  var self = dispatch;
  callback = callback || function () {
  };

  message.fulfilled = 1;
  message['fulfilled-by'] = joola.UID;
  message['fulfilled-timestamp'] = new Date().getTime();
  message['fulfilled-timestamp-nice'] = new Date();
  message['fulfilled-duration'] = parseInt(message['fulfilled-timestamp']) - parseInt(message.timestamp);

  message.result = result;

  dispatch.publishMessage(dispatch.stompClient, message, headers['reply-to']);
  joola.logger.trace('[dispatch] Fulfilled request [' + message.id + '] for channel [' + message.channel + ']');
};

dispatch.on = function (channel, callback, next) {
  callback = callback || function () {
  };
  var exists = _.find(dispatch.listeners, function (listener) {
    return listener._dispatch.message == channel && (listener._dispatch.cb ? listener._dispatch.cb.toString() : null) == (callback ? callback.toString() : null);
  });
  /* istanbul ignore else */
  if (!exists) {
    joola.logger.trace('[dispatch] listen on: ' + channel);
    var listener = {
      _dispatch: {
        message: channel,
        cb: callback
      }
    };
    dispatch.listeners.push(listener);
    /* istanbul ignore else */
    if (dispatch.enabled) {
      dispatch.subscriber.subscribe(channel, next);
      return listener;
    }
    else {
      dispatch.events.on(channel, function (message) {
        var found = _.find(dispatch.listeners, function (listener) {
          return listener._dispatch.message === channel;
        });
        if (found)
          found._dispatch.cb.apply(this, [channel, message]);
      });
      if (typeof next === 'function')
        return next(null);
      else
        return listener;
    }
  }
  else {
    callback(new Error('listener on this channel already assigned with the same callback.'));
    return null;
  }
};

dispatch.once = function (channel, callback, next) {
  callback = callback || function () {
  };

  var exists = _.find(dispatch.listeners, function (listener) {
    return listener._dispatch.message == channel && (listener._dispatch.cb ? listener._dispatch.cb.toString() : null) == (callback ? callback.toString() : null);
  });
  if (!exists) {
    joola.logger.trace('[dispatch] listen on: ' + channel);
    var listener = {
      _dispatch: {
        message: channel,
        replies: 0,
        cb: function () {
          ++this._dispatch.replies;
          if (this._dispatch.replies > 1)
            return;

          dispatch.removeListener(this._dispatch.message, this._dispatch.cb);
          return process.nextTick(function () {
            return callback.apply(this, arguments);
          });
        }
      }
    };
    dispatch.listeners.push(listener);
    if (dispatch.enabled)
      dispatch.subscriber.subscribe(channel, next);
    else {
      dispatch.events.once(channel, callback);
      if (typeof next === 'function')
        return next(null);
      else
        return null;

    }

    return listener;
  }
  else {
    callback(new Error('listener on this channel already assigned with the same callback.'));
    return null;
  }
};

dispatch.emit = function (channel, message, callback) {
  callback = callback || function () {
  };
  dispatch.events.emit(channel, message);
  /* istanbul ignore else */
  if (dispatch.publisher) {
    /* istanbul ignore else */
    if (dispatch.enabled) {
      dispatch.publisher.publish(channel, JSON.stringify(message), callback);
    }
    else
      return callback(null);
    joola.logger.trace('[dispatch] emit on: ' + channel);
  }
  else {
    joola.logger.warn('Dispatch emit before ready, channel [' + channel + ']');
    joola.logger.trace(message);
    return process.nextTick(function () {
      return callback(new Error('Dispatch is not ready yet'));
    });
  }
};

dispatch.removeListener = function (channel, cb) {
  var found = false;
  dispatch.listeners.forEach(function (listener, i) {
    if (listener._dispatch.message == channel && (listener._dispatch.cb ? listener._dispatch.cb.toString() : null) == (cb ? cb.toString() : null)) {
      found = dispatch.listeners.splice(i, 1);
    }
  });
  return found;
};

dispatch.removeAllListeners = function (channel) {
  var found = [];
  var indexes = [];
  dispatch.listeners.forEach(function (listener, i) {
    if (listener._dispatch.message == channel) {
      indexes.push(i);
    }
  });
  indexes.forEach(function (i, currentindex) {
    i = i - (currentindex);
    found.push(dispatch.listeners.splice(i, 1));
  });
  return found;
};