lib/dispatch/query.js

Summary

Maintainability
F
1 wk
Test Coverage
/**
 *  @title joola
 *  @overview the open-source data analytics framework
 *  @copyright Joola Smart Solutions, Ltd. <info@joo.la>
 *  @license GPL-3.0+ <http://spdx.org/licenses/GPL-3.0+>
 *
 *  Licensed under GNU General Public License 3.0 or later.
 *  Some rights reserved. See LICENSE, AUTHORS.
 **/

'use strict';

var
  joola = require('../joola'),

  domain = require('domain'),
  _ = require('underscore'),
  async = require('async'),
  ce = require('cloneextend'),

  localeval = require('notevil'),

  router = require('../webserver/routes/index');

var manager = {};

function fixOffset(date) {
  if (joola.config.get('query') && joola.config.get('query:fixgmt')) {
    var _date = new Date(date);
    _date.setHours(_date.getHours() + (-1 * require('moment')().zone() / 60));
    return new Date(_date);
  } else
    return date;
}

manager.parseDimensions = function(context, dimensions, callback) {
  var _dimensions = [];
  if (dimensions.length === 0)
    return callback(null, _dimensions);


  //search for location dimensions
  /*
   dimensions.forEach(function (d, i) {
   var key = d.key || d.dependsOn || d.attribute || d;
   //if (key === 'location') {
   dimensions[i] = {key: d.key || d, skip: true};
   dimensions.push({key: 'location.lat', fake: true});
   dimensions.push({key: 'location.lon', fake: true});
   //}
   });*/

  function parseDimension(d, cb) {
    return manager.parseDimension(context, d, function(err, dimension) {
      if (err)
        return cb(err);
      _dimensions.push(dimension);
      return cb(null, dimension);
    });
  }

  function sendResults(err, results) {
    if (err)
      return callback(err);
    return callback(null, _dimensions);
  }

  async.map(dimensions, parseDimension, sendResults);
};

manager.parseMetrics = function(context, metrics, callback) {
  var _metrics = [];

  var expected = 0;
  if (metrics.length === 0)
    return callback(null, _metrics);

  function parseMetric(m, cb) {
    var index = expected++;
    return manager.parseMetric(context, m, function(err, metric) {
      if (err)
        return cb(err);
      _metrics[index || _metrics.length] = metric;
      return cb(null);
    });
  }

  function sendResults(err, results) {
    if (err)
      return callback(err);
    return callback(null, _metrics);
  }

  async.map(metrics, parseMetric, sendResults);
};

manager.parseDimension = function(context, d, callback) {
  var col;
  var dimension = {};

  if (!d)
    return callback(new Error('Failed to parse dimension [' + d + ']'));

  var prepareDimension = function(collection, dimension, parentDimension, callback) {
    if (dimension)
      dimension.adhoc = true;
    else
      dimension = {};

    if (!dimension.datatype) {
      if (['timestamp', 'date'].indexOf(dimension.key) > -1)
        dimension.datatype = 'date';
      else
        dimension.datatype = 'string';
    }

    var keep = {
      name: dimension.name
    };
    dimension = ce.extend(dimension, parentDimension);
    if (keep.name)
      dimension.name = keep.name;
    dimension.attribute = dimension.attribute || dimension.dependsOn || dimension.key;
    dimension.collection = collection;

    if (dimension.key.indexOf('.') > -1)
      dimension.key = dimension.key.replace(/\./ig, '_');
    return callback(null, dimension);
  };

  if (typeof d !== 'object') {
    d = {
      key: d
    };
  }
  //if this is a computed dimension, simply pass it on
  if (!d.dependsOn)
    d.dependsOn = d.key;
  if (!d.name)
    d.name = d.key;

  d.collection = d.collection || context.query.collection;
  //let's find the metric
  if (d.collection) {
    //we're looking for a specific collection containing this metric
    joola.dispatch.collections.get(context, context.user.workspace, d.collection, function(err, collection) {
      if (err)
        return callback(err);

      col = collection[d.dependsOn] ? collection : null;

      if (!col && d.dependsOn.indexOf('.') > -1) {
        //let's see if we're dealing with a compound key
        if (joola.common.checkNested(collection, d.dependsOn))
          col = collection;
      }

      if (!col)
        return callback(new Error('Failed to locate collection for dimension [' + dimension.key + ']'));
      return prepareDimension(col, d, col[d], callback);
    });
  } else {
    return callback(new Error('Failed to locate collection for dimension [' + dimension.key + ']'));
  }
};

manager.parseMetric = function(context, m, callback) {
  var col;
  var metric = {};
  var metricIndex = m.index;

  if (!m)
    return callback(new Error('Failed to parse metric [' + m + ']'));

  var prepareMetric = function(collection, metric, parentMetric, callback) {
    try {
      if (!metric)
        metric = {};
      if (!parentMetric)
        metric.adhoc = true;
      var keep = {
        name: metric.name
      };
      metric = ce.extend(metric, parentMetric);
      if (!metric.name)
        metric.name = metric.key;
      //if (keep.name)
      //  metric.name = keep.name;
      metric.attribute = metric.attribute || metric.dependsOn || metric.key;
      metric.collection = collection;

      if (!metric.key)
        metric.key = 'key';

      metric._key = metric._key || metric.key;
      if (metric.key.indexOf('.') > -1)
        metric.key = metric.key.replace(/\./ig, '_');
      metric.index = metricIndex;
      return callback(null, metric);
    } catch (ex) {
      return callback(ex);
    }
  };
  if (typeof m !== 'object') {
    m = {
      key: m
    };
  }
  //if this is a computed metric, simply pass it on
  if (m.formula) {
    m.index = metricIndex;
    m._key = m.key;
    return callback(null, m);
  }
  m.collection = m.collection || context.query.collection;
  if (!m.key && m.dependsOn)
    m.key = m.dependsOn;
  if (!m.dependsOn)
    m.dependsOn = m.key;
  //if (!m.name)
  //  m.name = m.key;

  //let's find the metric
  if (m.collection) {
    //we're looking for a specific collection containing this metric
    joola.dispatch.collections.get(context, context.user.workspace, m.collection, function(err, collection) {
      if (err)
        return callback(err);

      col = collection[m.dependsOn] ? collection : null;

      if (!col && m.dependsOn.indexOf('.') > -1) {
        //let's see if we're dealing with a compound key
        if (joola.common.checkNested(collection, m.dependsOn)) {
          col = collection;
        }
      }

      if (!col)
        return callback(new Error('Failed to locate collection for metric [' + metric.key + ']'));


      var parentMetric = _.find(collection.metrics, function(item) {
        return item.key === (m.key || m);
      });


      prepareMetric(col, m, parentMetric, callback);
    });
  } else {
    return callback(new Error('Failed to locate collection for metric [' + metric.key + ']'));
  }

  return metric;
};

manager.parse = function(context, options, callback) {
  var query = joola.common.extend({}, options);

  if (!query.dimensions)
    query.dimensions = [];
  if (!query.metrics)
    query.metrics = [];

  if (query.timeframe) {
    if (typeof query.timeframe === 'string')
      query.timeframe = manager.translateTimeframe(query.timeframe, query.interval);

    if (query.timeframe && !query.timeframe.hasOwnProperty('last_n_items')) {
      if (query.timeframe_force_end)
        query.timeframe.end = new Date(query.timeframe_force_end);

      if (query.timeframe_force_start) {
        query.timeframe.start = new Date(query.timeframe_force_start);
      }

      if (typeof query.timeframe.start === 'string')
        query.timeframe.start = new Date(query.timeframe.start);
      if (typeof query.timeframe.end === 'string')
        query.timeframe.end = new Date(query.timeframe.end);

      if (query.timeframe.start)
        query.timeframe.start.setMilliseconds(0);

      if (!query.timeframe.end) {
        //if (query.timeframe.end)
        query.timeframe.end = new Date();
        query.timeframe.end.setMilliseconds(999);
      }

    }
  }

  query._interval = query.interval;
  query.interval = manager.translateInterval(query.interval);

  context.query = query;

  //dimension processing

  manager.parseDimensions(context, query.dimensions, function(err, dimensions) {
    if (err)
      return callback(err);
    query.dimensions = dimensions;

    //metric processing
    manager.parseMetrics(context, query.metrics, function(err, metrics) {
      if (err)
        return callback(err);
      query.metrics = metrics;

      var calls = [];
      if (query.metrics) {
        query.metrics.forEach(function(m) {
          //if this is calculated just pass it along
          if (m.formula) {
            m.formula.dependsOn.forEach(function(dependent) {
              if (typeof dependent !== 'object')
                dependent = {
                  key: dependent
                };
              dependent.collection = dependent.collection || query.collection;
              calls.push(function(callback) {
                dependent.index = 1000;
                manager.parseMetric(context, dependent, function(err, _dep) {
                  if (err) {
                    return callback(err);
                  }
                  var exists = _.find(query.metrics, function(qm) {
                    return qm.key == _dep.key;
                  });
                  if (!exists) {
                    _dep.dependent = true;
                    query.metrics[query.metrics.length] = _dep;
                  }

                  return callback(null, query);
                });
              });
            });
          }
        });
      }
      if (calls.length > 0) {
        async.series(calls, function(err) {
          if (err)
            return callback(err);
          return callback(null, query);
        });
      } else {
        return callback(null, query);
      }
    });
  });
};

manager.applyFilters = function(context, query, callback) {
  if (!query.filter)
    query.filter = [];
  if (context.user.filter && context.user.filter.length > 0) {
    if (Array.isArray(context.user.filter[0])) {
      context.user.filter.forEach(function(arrFilter) {
        var newFilter = [];
        arrFilter.forEach(function(f) {
          newFilter.push(f);
        });
        newFilter = newFilter.concat('--forced');
        query.filter.push(newFilter);
      });
    } else {
      var newFilter = [];
      context.user.filter.forEach(function(f) {
        newFilter.push(f);
      });
      newFilter = newFilter.concat('--forced');
      query.filter.push(newFilter);
    }
  }
  return callback(null, query);
};

manager.translateTimeframe = function(timeframe, interval) {
  var _enddate = new Date();
  //_enddate.setMilliseconds(0);
  //_enddate.setSeconds(_enddate.getSeconds() - 1);
  var _startdate = new Date(_enddate);

  switch (timeframe) {
    case 'this_second':
      timeframe = 'this_1_second';
      break;
    case 'this_minute':
      timeframe = 'this_1_minute';
      break;
    case 'this_hour':
      timeframe = 'this_1_hour';
      break;
    case 'this_day':
      timeframe = 'this_1_day';
      break;
    case 'this_week':
      timeframe = 'this_1_week';
      break;
    case 'this_month':
      timeframe = 'this_1_month';
      break;
    case 'this_year':
      timeframe = 'this_1_year';
      break;

    case 'last_second':
      timeframe = 'last_1_second';
      break;
    case 'last_minute':
      timeframe = 'last_1_minute';
      break;
    case 'last_hour':
      timeframe = 'last_1_hour';
      break;
    case 'last_day':
      timeframe = 'last_1_day';
      break;
    case 'last_week':
      timeframe = 'last_1_week';
      break;
    case 'last_month':
      timeframe = 'last_1_month';
      break;
    case 'last_year':
      timeframe = 'last_1_year';
      break;
    default:
      break;
  }

  var m;
  m = /this_(\d+)_second/.exec(timeframe);
  if (m && m.length > 0) {
    _startdate = new Date(_enddate);
    _startdate.setMilliseconds(0);
    _startdate.setSeconds(_startdate.getSeconds() - (m[1] - 1));
    return {
      start: new Date(_startdate),
      end: new Date(_enddate)
    };
  }

  m = /this_(\d+)_minute/.exec(timeframe);
  if (m && m.length > 0) {
    _startdate = new Date(_enddate);
    _startdate.setMilliseconds(0);
    _startdate.setSeconds(0);
    _startdate.setMinutes(_startdate.getMinutes() - (m[1] - 1));
    return {
      start: new Date(_startdate),
      end: new Date(_enddate)
    };
  }

  m = /this_(\d+)_hour/.exec(timeframe);
  if (m && m.length > 0) {
    _startdate = new Date(_enddate);
    _startdate.setMilliseconds(0);
    _startdate.setSeconds(0);
    _startdate.setMinutes(0);
    _startdate.setHours(_startdate.getHours() - (m[1] - 1));
    return {
      start: new Date(_startdate),
      end: new Date(_enddate)
    };
  }

  m = /this_(\d+)_day/.exec(timeframe);
  if (m && m.length > 0) {
    _startdate = new Date(_enddate);
    _startdate.setMilliseconds(0);
    _startdate.setSeconds(0);
    _startdate.setMinutes(0);
    _startdate.setHours(0);
    _startdate.setDate(_startdate.getDate() - (m[1] - 1));
    return {
      start: new Date(_startdate),
      end: new Date(_enddate)
    };
  }

  m = /this_(\d+)_month/.exec(timeframe);
  if (m && m.length > 0) {
    _startdate = new Date(_enddate);
    _startdate.setMilliseconds(0);
    _startdate.setSeconds(0);
    _startdate.setMinutes(0);
    _startdate.setHours(0);
    _startdate.setDate(0);
    _startdate.setMonth(_startdate.getMonth() - (m[1] - 1));
    return {
      start: new Date(_startdate),
      end: new Date(_enddate)
    };
  }

  m = /this_(\d+)_year/.exec(timeframe);
  if (m && m.length > 0) {
    _startdate = new Date(_enddate);
    _startdate.setMilliseconds(0);
    _startdate.setSeconds(0);
    _startdate.setMinutes(0);
    _startdate.setHours(0);
    _startdate.setDate(0);
    _startdate.setMonth(0);
    _startdate.setYear(_startdate.getYear() - (m[1] - 1) + 1900);
    return {
      start: new Date(_startdate),
      end: new Date(_enddate)
    };
  }

  m = /last_(\d+)_second/.exec(timeframe);
  if (m && m.length > 0) {

    _startdate.setSeconds(_startdate.getSeconds() - (m[1]));
    return {
      start: new Date(_startdate),
      end: new Date(_enddate)
    };
  }

  m = /last_(\d+)_minute/.exec(timeframe);
  if (m && m.length > 0) {
    _startdate.setMinutes(_startdate.getMinutes() - (m[1]));

    return {
      start: new Date(_startdate),
      end: new Date(_enddate)
    };
  }

  m = /last_(\d+)_hour/.exec(timeframe);
  if (m && m.length > 0) {
    _startdate.setHours(_startdate.getHours() - (m[1]));

    return {
      start: new Date(_startdate),
      end: new Date(_enddate)
    };
  }

  m = /last_(\d+)_day/.exec(timeframe);
  if (m && m.length > 0) {

    _startdate.setDate(_startdate.getDate() - (m[1]));

    return {
      start: new Date(_startdate),
      end: new Date(_enddate)
    };
  }

  m = /last_(\d+)_month/.exec(timeframe);
  if (m && m.length > 0) {
    _startdate.setMonth(_startdate.getMonth() - (m[1]));

    return {
      start: new Date(_startdate),
      end: new Date(_enddate)
    };
  }

  m = /last_(\d+)_year/.exec(timeframe);
  if (m && m.length > 0) {
    _startdate.setYear(_startdate.getYear() - (m[1]) + 1900);

    return {
      start: new Date(_startdate),
      end: new Date(_enddate)
    };
  }

  m = /last_(\d+)_items/.exec(timeframe);
  if (m && m.length > 0) {
    return {
      last_n_items: parseInt(m[1], 10)
    };
  }
};

manager.translateInterval = function(interval) {
  switch (interval) {
    case 'second':
      return 'timebucket.second';
    case 'minute':
      return 'timebucket.minute';
    case 'hour':
      return 'timebucket.hour';
    case 'day':
      return 'timebucket.ddate';
    case 'month':
      return 'timebucket.month';
    case 'year':
      return 'timebucket.year';
    default:
      return 'timebucket.ddate';
  }
};

manager.formatResults = function(results, callback) {
  var query = results.queryplan.query;
  results.documents = results.documents || [];

  results.dimensions = results.dimensions || query.dimensions;
  results.metrics = results.metrics || query.metrics;

  //allow engines to return a fake metric to be removed before processing
  results.dimensions = _.filter(results.dimensions, function(dimension) {
    return dimension.key !== 'fake';
  });
  results.metrics = _.filter(results.metrics, function(metric) {
    return metric.key !== 'fake';
  });

  var dindex = 0;
  async.map(results.documents, function(document, cb) {
    //results.documents.forEach(function (document, dindex) {
    delete document.key;
    results.dimensions.forEach(function(dimension) {
      //document[dimension.key] = document[dimension.key];
      if (dimension.datatype == 'date') {
        document[dimension.key] = document[dimension.key];
      }
      /*
             else if (dimension.datatype === 'ip' && document[dimension.key] && document[dimension.key] !== '(not set)') {
             var ip = document[dimension.key];
             document[dimension.key] = joola.common.extend({ip: ip}, joola.common.geoip.lookup(ip)) || '(not set)';
             }
             else if (dimension.key === 'location') {
             document[dimension.key] = document['location_lat'] + ',' + document['location_lon'];
             delete document['location.lat'];
             delete document['location.lon'];
             }*/
      else if (dimension.datatype === 'geo') {

      } else {
        //if (dimension.attribute !== dimension.key) {
        //document[dimension.attribute] = document[dimension.key];
        //delete document[dimension.key]

        //}
      }


      if (dimension.transform) {
        try {
          var transformFn = localeval('(' + dimension.transform + ')');
          document[dimension.key] = transformFn.apply(dimension, [document[dimension.key]]);
        } catch (ex) {
          document[dimension.key] = null;
        }
      }

    });
    results.metrics.forEach(function(metric) {
      if (metric.formula) {
        var calc;
        try {
          calc = localeval('(' + metric.formula.run + ')');
        } catch (ex) {
          console.log(ex);
          calc = null;
        }
        var args = [];
        metric.formula.dependsOn.forEach(function(dep) {
          if (typeof dep === 'object')
            args.push(document[dep.key]);
          else
            args.push(document[dep]);
        });

        try {
          var calcResult = calc.apply(document, args);
          joola.common.flatGetSet(document, metric._key, calcResult);
          //document[metric.key]
          //document[metric.key] = document[metric.key].toString();
        } catch (ex) {
          joola.common.flatGetSet(document, metric._key, null);
          //document[metric.key] = null;
        }
      }

      if (metric.transform) {
        try {
          var transformFn = localeval('(' + metric.transform + ')');
          var result = transformFn.apply(metric, [document[metric._key]]);
          joola.common.flatGetSet(document, metric._key, result);
        } catch (ex) {
          console.log(ex);
          joola.common.flatGetSet(document, metric._key, null);
          //document[metric.key] = null;
        }
      }

      var mkey = metric._key || metric.key;
      var value = joola.common.flatGetSet(document, mkey);
      if (value) {
        if (metric.hasOwnProperty('decimals')) {
          value = parseFloat(value.toFixed(metric.decimals));
          joola.common.flatGetSet(document, mkey, value);
        }
        if (metric.prefix) {
          value = metric.prefix + value.toString();
          joola.common.flatGetSet(document, mkey, value);
        }
        if (metric.suffix) {
          value = value.toString() + metric.suffix;
          joola.common.flatGetSet(document, mkey, value);

        }
      }
      if (metric.dependent) {
        delete document[metric.key];
      }
    });
    results.documents[dindex] = document;
    dindex++;
    return cb();
  }, function(err) {



    //sort
    /*results.documents = _.sortBy(results.documents, function (document) {
     if (query.orderby) {
     return document[Object.keys(document)[query.orderby[0]]];
     }
     else
     return document[Object.keys(document)[0]];
     });if (query.orderby && query.orderby[0] === 'timestamp' && query.orderby[1].toLowerCase() === 'desc')
     results.documents = results.documents.reverse();
     else if (query.orderby) {

     results.documents = _.sortBy(results.documents, function (document) {
     return document[query.orderby[0]];
     });
     if (query.orderby && query.orderby[1].toLowerCase() === 'desc')
     results.documents = results.documents.reverse();
     }*/

    var _metrics = [];
    results.metrics.forEach(function(metric, i) {
      if (!metric.dependent)
        _metrics.push(metric);
    });
    results.metrics = _metrics;

    results.uid = results.queryplan.query.uid;
    results.cost = results.queryplan.query.cost;
    results.resultCount = results.documents.length;

    if (results.queryplan.query.timeframe && results.queryplan.query.timeframe.start && results.queryplan.query.timeframe.end) {
      results.queryplan.query.timeframe.start = new Date(results.queryplan.query.timeframe.start).toISOString();
      results.queryplan.query.timeframe.end = new Date(results.queryplan.query.timeframe.end).toISOString();
    }

    results.dimensions.forEach(function(d) {
      if (d.collection && d.collection.key)
        d.collection = d.collection.key;
    });
    results.metrics.forEach(function(m) {
      if (m.collection && m.collection.key)
        m.collection = m.collection.key;
    });
    if (!query.filter)
      query.filter = [];
    query.filter.forEach(function(f, i) {
      if (f[3] === '--forced') {
        query.filter.splice(i, 1);
      }
    });
    results.query = query;
    delete results.queryplan;
    return callback(null, results);
  });

};

manager.sendResults = function(results, callback) {
  return callback(null, results);
};

manager.runningQueries = [];
manager.realtimeQueries = {};

exports.stop = {
  name: "/query/stop",
  description: "",
  inputs: ['querytoken'],
  _outputExample: {},
  _permission: ['query:stop'],
  _dispatch: {
    message: 'query:stop'
  },
  _route: function(req, res) {
    var _params = {};
    Object.keys(req.params).forEach(function(p) {
      if (p != 'resource' && p != 'action')
        _params[p] = req.params[p];
    });

    var querytoken = _params.querytoken;
    joola.logger.debug('Stopping query [' + querytoken + ']');
    var index = manager.runningQueries.indexOf(querytoken);
    manager.runningQueries.splice(index, 1);

    if (manager.realtimeQueries[querytoken]) {
      var sid = res.socket.id;
      delete manager.realtimeQueries[querytoken].sockets[sid];
    } else {
      joola.logger.trace('Failed to locate query [' + querytoken + '].');
    }
    return router.responseSuccess({
      ok: 1
    }, {}, req, res);
  }
};

exports.queryKey = function(query) {
  return joola.common.hash(JSON.stringify(query));
};

manager.query = function(context, query, callback) {
  var d = domain.create();
  var called = false;
  d.on('error', function(err) {
    joola.logger.error('Failed to query, ' + err);
    if (!called) {
      called = true;
      return callback(new Error('Failed to query: ' + err));
    }
  });
  d.run(function() {
    return joola.datastore.providers.default.query(context, query, function(err, results) {
      called = true;
      if (err)
        return callback(err);
      return callback(null, results);
    });
  });
};

exports.fetch = {
  name: "/query/fetch",
  description: "",
  inputs: ['options'],
  _outputExample: {},
  _permission: ['query:fetch'],
  _dispatch: {
    message: 'query:fetch'
  },
  _route: function(req, res) {
    var _token = req.token;
    var lastQueryEndDate;
    var timestampDimension;
    var _params = {
      options: req.parsed.options || req.parsed.payload
    };

    if (!Array.isArray(_params.options))
      _params.options = [_params.options];
    /*
     Object.keys(req.params).forEach(function (p) {
     if (p != 'resource' && p != 'action')
     _params[p] = req.params[p];
     });*/
    var aborted, timerID;

    var queryKey = exports.queryKey(_params.options[0]);

    var request = function(firstRun, callback) {
      //_params.options.timeframe.end.setMilliseconds(0);
      if (!firstRun && lastQueryEndDate && timestampDimension) {
        _params.options[0].timeframe_force_start = lastQueryEndDate;

        switch (_params.options[0].interval) {
          case 'second':
            _params.options[0].timeframe_force_start.setMilliseconds(0);
            break;
          case 'minute':
            _params.options[0].timeframe_force_start.setSeconds(0);
            _params.options[0].timeframe_force_start.setMilliseconds(0);
            break;
          case 'hour':
            _params.options[0].timeframe_force_start.setMinutes(0);
            _params.options[0].timeframe_force_start.setSeconds(0);
            _params.options[0].timeframe_force_start.setMilliseconds(0);
            break;
          case 'day':
            _params.options[0].timeframe_force_start.setHours(0);
            _params.options[0].timeframe_force_start.setMinutes(0);
            _params.options[0].timeframe_force_start.setSeconds(0);
            _params.options[0].timeframe_force_start.setMilliseconds(0);
            break;
          case 'month':
            _params.options[0].timeframe_force_start.setDate(1);
            _params.options[0].timeframe_force_start.setHours(0);
            _params.options[0].timeframe_force_start.setMinutes(0);
            _params.options[0].timeframe_force_start.setSeconds(0);
            _params.options[0].timeframe_force_start.setMilliseconds(0);
            break;
          case 'year':
            _params.options[0].timeframe_force_start.setMonth(0);
            _params.options[0].timeframe_force_start.setDate(1);
            _params.options[0].timeframe_force_start.setHours(0);
            _params.options[0].timeframe_force_start.setMinutes(0);
            _params.options[0].timeframe_force_start.setSeconds(0);
            _params.options[0].timeframe_force_start.setMilliseconds(0);
            break;
          default:
            break;
        }
      }

      var cb = function(err, result, headers) {
        clearTimeout(timerID);
        if (aborted)
          return;
        if (err)
          if (callback)
            return callback(new router.ErrorTemplate('Failed to route action [' + 'fetch' + ']: ' + (typeof(err) === 'object' ? err.message : err)));
          else
            return router.responseError(500, new router.ErrorTemplate('Failed to route action [' + 'fetch' + ']: ' + (typeof(err) === 'object' ? err.message : err)), req, res);

        if (result[0].query && result[0].query.timeframe && result[0].query.timeframe.end) {
          lastQueryEndDate = new Date(result[0].query.timeframe.end);
        }
        if (callback)
          return callback(null, result);
        else {
          if (result.query && result[0].query.realtimeUID)
            result.realtime = result[0].query.realtimeUID;
          return router.responseSuccess(result, headers, req, res);
        }
      };
      joola.auth.extendToken(req.token, function(err, token) {
        if (!token)
          token = {}; //APITokens
        if (joola.dispatch.enabled)
          joola.dispatch.request(token._ || req.token, 'query:fetch', _params, cb);
        else {
          exports.fetch.run.apply(this, [{
              user: req.user
            },
            _params.options,
            cb
          ]);
        }

        timerID = setTimeout(function() {
          if (callback)
            return callback(new router.ErrorTemplate('Timeout while waiting for [' + 'fetch' + ']'));
          else
            return router.responseError(408, new router.ErrorTemplate('Timeout while waiting for [' + 'fetch' + ']'), req, res);

        }, 15000);
      });
    };

    if (_params.options[0].realtime) {
      if (typeof _params.options[0].realtime === 'boolean') {
        _params.options[0].realtime = {
          enabled: _params.options[0].realtime,
          interval: 1000
        };
      } else if (typeof _params.options[0].realtime === 'object') {
        if (_params.options[0].realtime.interval < 1000)
          return router.responseError(422, new router.ErrorTemplate('Minimal realtime interval is 1000ms'));
      } else {
        _params.options[0].realtime = {
          enabled: false
        };
      }
      if (_params.options[0].realtime.enabled === true) {
        var exist = _.find(manager.realtimeQueries, function(q) {
          return q.key === queryKey;
        });
        if (exist) {
          exist.sockets[res.socket.id] = res;
        } else {
          exist = {
            key: queryKey,
            sockets: {}
          };
          exist.sockets[res.socket.id] = res;
          manager.realtimeQueries[queryKey] = exist;

          _params.options[0].queryKey = queryKey;
          var realtimeUID = queryKey; //joola.common.uuid();
          _params.options[0].realtimeUID = realtimeUID;
          manager.runningQueries.push(realtimeUID);
          var realtime = function() {
            request(false, function(err, result) {
              if (err)
                return;

              var sockets;
              if (manager.realtimeQueries[queryKey]) {
                sockets = manager.realtimeQueries[queryKey].sockets;
                _.each(sockets, function(res) {
                  if (res.socket.disconnected) {
                    joola.logger.trace('Removing socket [' + res.socket.id + '] due to disconnection from realtime query [' + queryKey + '].');
                    delete manager.realtimeQueries[queryKey].sockets[res.socket.id];
                  } else {
                    result.realtime = realtimeUID;
                    return router.responseSuccess(result, {}, req, res);
                  }
                });
              }
              if (!sockets || Object.keys(sockets).length === 0) {
                joola.logger.debug('Stopping realtime query [' + queryKey + '] due to inactivity.');
                delete manager.realtimeQueries[queryKey];
                clearInterval(intervalID);
              }
            });
          };
          var intervalID = setInterval(realtime, _params.options[0].realtime.interval);
        }
      }
    }
    request(true);
  },
  run: function(context, options, callback) {
    callback = callback || function() {};
    var wasArray = true;
    if (!Array.isArray(options)) {
      options = [options];
      wasArray = false;
    }

    async.map(options, function(opt, callback) {
      opt = joola.common.extend({
        ts: {
          start: new Date()
        },
        uid: joola.common.uuid(),
        dimensions: [],
        metrics: [],
        timeframe: 'this_day',
        interval: 'minute',
        realtime: false,
        filter: null,
        dontcache: true //temp until #223 is resolved.
      }, opt);

      //console.log(require('util').inspect(opt, {depth: null, colors: true}));
      manager.parse(context, opt, function(err, query) {
        if (err)
          return callback(err);

        manager.applyFilters(context, query, function(err, query) {
          if (err)
            return callback(err);
          manager.query(context, query, function(err, results) {
            if (err)
              return callback(err);

            results.queryplan.query.ts.end = new Date();
            results.queryplan.query.ts.duration = results.queryplan.query.ts.end.getTime() - results.queryplan.query.ts.start.getTime();
            //console.log('Query duration: ', query.ts.duration);
            if (context.user.workspace != '.joola-stats')
              joola.stats.emit({
                event: 'reads',
                workspace: context.user.workspace,
                username: context.user.username,
                readCount: 1,
                duration: query.ts.duration
              });

            return manager.formatResults(results, callback);
          });
        });
      });

    }, function(err, results) {
      if (err)
        return callback(err);
      if (!wasArray && results.length === 1)
        return callback(null, results[0]);
      else
        return callback(null, results);
    });
  }
};