lib/dispatch/beacon.js

Summary

Maintainability
D
3 days
Test Coverage
/**
 *  @title joola
 *  @overview the open-source data analytics framework
 *  @copyright Joola Smart Solutions, Ltd. <info@joo.la>
 *  @license GPL-3.0+ <http://spdx.org/licenses/GPL-3.0+>
 *
 *  Licensed under GNU General Public License 3.0 or later.
 *  Some rights reserved. See LICENSE, AUTHORS.
 **/

'use strict';

var
  joola = require('../joola'),
  domain = require('domain'),
  diff = require('deep-diff').diff,
  async = require('async'),
  traverse = require('traverse'),
  router = require('../webserver/routes/index'),
  equal = require('deep-equal'),
  ce = require('cloneextend'),
  path = require('path'),
  _ = require('underscore'),
  fs = require('fs');

var etl = {};
exports.etl = etl;

Array.prototype.clean = function(deleteValue) {
  for (var i = 0; i < this.length; i++) {
    if (!this[i]) {
      this.splice(i, 1);
      i--;
    }
  }
  return this;
};

etl.verify = function(context, workspace, collection, documents, callback) {
  var _document;
  if (!Array.isArray(documents))
    documents = [documents];

  documents.clean();

  joola.dispatch.collections.get(context, workspace, collection, function(err, _collection) {
    if (err)
      _collection = {
        time_field: 'timestamp'
      };

    var buildRowKey = function(obj) {
      var key;
      try {
        key = obj[_collection.time_field || 'timestamp'].toISOString();
      } catch (ex) {
        key = '';
      }
      traverse.map(obj, function(x) {
        if (x && typeof(x) === 'string') {
          key += x;
        }
      });
      key = joola.common.hash(key);
      return key;
    };

    async.map(documents, function(doc, cb) {
      try {
        if (!doc[_collection.time_field || 'timestamp'])
          doc[_collection.time_field || 'timestamp'] = new Date();
        else {
          doc[_collection.time_field || 'timestamp'] = new Date(doc[_collection.time_field || 'timestamp'].value || doc[_collection.time_field || 'timestamp']);
        }

        Object.keys(doc).forEach(function(key) {
          var elem = doc[key];
          try {
            if (elem.datatype === 'ip' || elem.key === 'ip' || key === 'ip') {
              var ipaddress = elem.value || elem;
              var ip = joola.common.geoip.lookup(ipaddress);
              if (ip && typeof ip === 'object') {
                delete ip.range;
                ip.address = ipaddress;
                doc[key] = ip;
                doc.location = {
                  lat: ip.ll[0],
                  lon: ip.ll[1]
                };
              } else {
                ip = {};
                ip.address = ipaddress;
                doc[key] = ip;
                doc.location = {
                  lat: 0,
                  lon: 0
                };
              }
            }
          } catch (ex) {
            //ignore
            console.log('exception', ex);
          }
        });
      } catch (ex) {
        doc[_collection.time_field || 'timestamp'] = new Date();
        return cb(ex);
      }
      doc._key = buildRowKey(doc);
      return cb(null);
    }, function(err) {
      if (err)
        return callback(err);
      if (documents.length > 0)
        _document = ce.clone(documents[0]);

      joola.dispatch.collections.metadata(context, workspace, null, _document, function(err, meta) {
        /* istanbul ignore if */
        if (err)
          return callback(err);

        joola.dispatch.collections.get(context, workspace, collection, function(err, _collection) {
          if (err) {
            meta.key = collection;
            meta.name = collection;

            joola.dispatch.collections.add(context, workspace, meta, function(err, _collection) {
              /* istanbul ignore if */
              if (err && err.message != 'Collection already exist') {
                return callback(err);
              }
              joola.dispatch.collections.get(context, workspace, collection, function(err, _collection) {
                if (err) {
                  return callback(err);
                }
                if (joola.datastore.providers.default.addcollection) {
                  joola.datastore.providers.default.addcollection(meta.key, meta, function(err) {
                    if (err)
                      return callback(err);

                    return callback(null);
                  });
                } else {
                  return callback(null);
                }
              });
            });
          } else {
            joola.dispatch.collections.metadata(context, workspace, collection, _document, function(err, _meta) {
              /* istanbul ignore if */
              if (err)
                return callback(err);

              var differences;
              delete meta._key;
              var match = equal(meta, _meta);

              if (!match) {
                differences = diff(meta, _meta);
                differences = _.filter(differences, function(diff) {
                  return diff.kind !== 'N';
                });
              }
              if (differences) {
                match = differences.length === 0;
              }

              if (!match && _collection.strongTyped) {
                joola.logger.warn({
                  category: 'beacon',
                  diff: differences
                }, 'Strong-typed collection [' + collection + '] cannot be modified by Beacon.');
                return callback(new Error('Strong-typed collection [' + collection + '] cannot be modified by Beacon.'));
              } else if (!match) {
                differences = diff(meta, _meta);
                meta.key = collection;
                meta.name = collection;
                joola.dispatch.collections.patch(context, workspace, meta.key, meta, function(err) {
                  joola.logger.debug({
                    category: 'beacon',
                    //diff: differences
                  }, 'Updating collection [' + collection + '] due to meta change.');
                  if (joola.datastore.providers.default.altercollection) {
                    joola.datastore.providers.default.altercollection(meta.key, meta, differences, function(err) {
                      if (err)
                        return callback(err);
                      return callback(null);
                    });
                  } else {
                    return callback(null);
                  }
                });
              } else {
                return callback(null);
              }
            });
          }
        });
      });
    });
  });
};

etl.load = function(context, workspace, collection, documents, options, callback) {
  joola.dispatch.collections.metadata(context, workspace, collection, ce.clone(documents[0]), function(err, meta, _collection) {
    if (err)
      return callback(err);

    _collection.meta = meta;
    _collection.storeKey = (workspace + '_' + collection).replace(/[^\w\s]/gi, '');

    //cleanup the object if we passed adhoc values;
    var docCount = 0;
    var traverseFound = false;
    var geoFound = false;
    async.map(documents, function(document, cb) {
      if (docCount === 0 || (docCount > 0 && (traverseFound || geoFound))) {
        traverse.forEach(document, function(x) {
          if (x && typeof x === 'object' && x.hasOwnProperty('value')) {
            traverseFound = true;
            this.update(x.value);
          }
        });
        docCount++;
      }
      return cb(null);
    }, function() {
      var d = domain.create();
      d.on('error', function(err) {
        console.log(err);
        joola.logger.error('Failed to insert document, ' + err);
        return callback('Failed to insert document: ' + err);
      });
      d.run(function() {
        var filtered = _.filter(Object.keys(joola.datastore.providers), function(key) {
          var p = joola.datastore.providers[key];
          return key !== 'default' && (p.enabled || !p.hasOwnProperty('enabled'));
        });
        async.map(filtered, function(key, cb) {
          var provider = joola.datastore.providers[key];
          provider.insert(_collection, ce.clone(documents), options, function(err, results) {
            if (err)
              return cb(err);
            return cb(null, results);
          });
        }, function(err, results) {
          if (err)
            return callback(err);
          async.map(documents, function(d, cb) {
            d.saved = true;
            return cb(null, d);
          }, function(err, arrdocuments) {
            if (err)
              return callback(err);
            return callback(null, arrdocuments, results);
          });
        });
      });
    });
  });
};

exports.insert = {
  name: "/beacon/insert",
  description: "",
  inputs: {
    required: ['workspace', 'collection', 'document'],
    optional: ['options']
  },
  _outputExample: {},
  _permission: ['beacon:insert'],
  _dispatch: {
    message: 'beacon:insert'
  },
  _route: function(req, res) {
    var _params = {}; //req.parsed;
    var context = {};
    context.user = req.user;
    _params.workspace = context.user.workspace;
    if (!req.parsed.workspace)
      _params.workspace = req.user.workspace;
    _params.collection = req.parsed.collection;
    _params.document = req.parsed.document || req.parsed.payload;

    var beaconConfig = joola.config.get('beacon');
    var w = beaconConfig && beaconConfig.hasOwnProperty('wait') ? beaconConfig.wait : false;
    if (!req.parsed.options)
      req.parsed.options = {
        wait: w
      };
    else if (!req.parsed.options.hasOwnProperty('wait'))
      req.parsed.options.wait = w;

    if (typeof _params.document === 'string') {
      try {
        _params.document = JSON.parse(_params.document);
      } catch (ex) {
        return router.responseError(400, new router.ErrorTemplate('Document must be a valid JSON'), req, res);
      }
    }
    if (_params.document.length > 0 && typeof _params.document[0] === 'string') {
      try {
        _params.document.forEach(function(d, i, arr) {
          arr[i] = JSON.parse(d);
        });
      } catch (ex) {
        return router.responseError(400, new router.ErrorTemplate('Document must be a valid JSON'), req, res);
      }
    }
    try {
      var _token = req.token;
      var cb = function(err, result, headers) {
        if (err)
          return router.responseError(500, new router.ErrorTemplate('Failed to route action [' + 'insert' + ']: ' + (typeof(err) === 'object' ? err.message : err)), req, res);

        return router.responseSuccess(result, headers, req, res);
      };

      if (joola.dispatch.enabled)
        joola.dispatch.request(_token._ || _token, 'beacon:insert', _params, req.parsed.options.wait ? cb : function() {});
      else
        exports.insert.run.apply(this, [context, _params.workspace, _params.collection, _params.document, _params.options, req.parsed.options.wait ? cb : function() {}]);
      if (!req.parsed.options.wait) {
        cb(null, _params.document, {});
      }
    } catch (ex) {
      return router.responseError(500, new router.ErrorTemplate(ex), req, res);
    }
  },
  run: function(context, workspace, collection, documents, options, callback) {

    if (typeof options === 'function') {
      callback = options;
      options = {};
    } else if (typeof documents === 'function') {
      callback = documents;
      options = {};
      documents = collection;
      collection = workspace;
      workspace = context.user.workspace;
    }

    callback = callback || function() {};
    if (context.user.workspace !== workspace && context.user.permissions.indexOf('superuser') === -1) {
      var err = new Error('Forbidden');
      err.code = 403;
      return callback(err);
    }

    try {
      if (!Array.isArray(documents)) {
        documents = [documents];
      }

      var start_ts = new Date().getTime();
      var count = documents.length;

      etl.verify(context, workspace, collection, documents, function(err) {
        if (err)
          return callback(err);
        etl.load(context, workspace, collection, documents, options || {}, function(err, documents) {
          if (err)
            return callback(err);

          var end_ts = new Date().getTime();
          if (count > 1)
            joola.logger.trace('Beacon insert, count: ' + count + ', total: ' + (end_ts - start_ts) + 'ms, rate: ' + (count / (end_ts - start_ts)) + 'doc/ms');

          if (workspace != '.joola-stats')
            joola.stats.emit({
              event: 'writes',
              workspace: workspace,
              username: context.user.username,
              collection: collection,
              writeCount: count,
              duration_per_doc: (count / (end_ts - start_ts))
            });

          subscribers.forEach(function(subscriber, i) {
            if (subscriber.disconnected)
              subscribers.splice(i, 1);
            else {
              subscriber.emit('event', collection, documents);
            }
          });

          return callback(null, documents);
        });
      });
    } catch (ex) {
      /* istanbul ignore next */
      console.log('exception', ex);
      /* istanbul ignore next */
      console.log(ex.stack);

      return callback(ex);
    }
  }
};

var subscribers = [];

exports.subscribe = {
  name: "/beacon/subscribe",
  description: "I subscribe to socket.io",
  inputs: {
    required: [],
    optional: []
  },
  _outputExample: {},
  _permission: ['beacon:subscribe'],
  _dispatch: {
    message: 'subscribe',
    limit: -1
  },
  _route: function(req, res, next) {
    subscribers.push(req.socket);
    return router.responseSuccess({}, {}, req, res);
  },
  run: function(context, callback) {
    callback = callback || function() {};

    //subscribers.push(req.socket);
    return callback(null, {});
  }
};