HansHammel/watchmen

View on GitHub
lib/storage/providers/redis.js

Summary

Maintainability
B
4 hrs
Test Coverage
var util = require('util');
var async = require('async');
var debug = require('debug')('redis-storage');
var redis = require("redis");
var shortid = require('shortid');
var aggregator = require('../../aggregator');
var connectRedis = require('connect-redis');

var SERVICE_KEY_SUFIX = "service";
var SERVICES_KEY_SUFIX = "service";
var LATENCY_KEY_SUFIX = "latency";
var CURRENT_OUTAGE_KEY_SUFIX = "outages:current";
var OUTAGES_KEY_SUFIX = "outages";
var FAILURE_COUNT_SUFIX = "failurecount";

function StorageRedis(options) {
  this.options = options || {};
  this.redis = redis.createClient(this.options.port || 6379, this.options.host || '127.0.0.1');
  this.redis.select(this.options.db || 0);
}

exports = module.exports = StorageRedis;


/**
 * Add service
 * @param service
 * @param callback
 */

StorageRedis.prototype.addService = function (service, callback) {
  var id = shortid.generate();
  service.id = id;
  service.created = +new Date();
  var multi = this.redis.multi();
  multi.set(SERVICE_KEY_SUFIX + ':' + id, JSON.stringify(service));
  multi.sadd(SERVICES_KEY_SUFIX, id);
  multi.exec(function (err) {
    callback(err, id);
  });
};

/**
 * Update service
 * @param service
 * @param callback
 */

StorageRedis.prototype.updateService = function (service, callback) {
  var self = this;
  var multi = this.redis.multi();
  multi.set(SERVICE_KEY_SUFIX + ':' + service.id, JSON.stringify(service));
  multi.exec(function (err) {
    if (err) {
      return callback(err);
    }
    self.getService(service.id, callback);
  });
};

/**
 * Get service
 * @param id
 * @param callback
 */

StorageRedis.prototype.getService = function (id, callback) {
  this.redis.get(SERVICE_KEY_SUFIX + ':' + id, function (err, data) {
    callback(err, (!err && data) ? JSON.parse(data) : null);
  });
};

/**
 * Delete service
 * @param id
 * @param callback
 */

StorageRedis.prototype.deleteService = function (id, callback) {
  var multi = this.redis.multi();
  // delete service
  multi.del(SERVICE_KEY_SUFIX + ':' + id);
  multi.srem(SERVICES_KEY_SUFIX, id);

  // delete reporting data
  multi.del(id + ':' + CURRENT_OUTAGE_KEY_SUFIX);
  multi.del(id + ':' + OUTAGES_KEY_SUFIX);
  multi.del(id + ':' + LATENCY_KEY_SUFIX);
  multi.del(id + ':' + FAILURE_COUNT_SUFIX);
  multi.exec(callback);
};

/**
 * Reset service data
 * @param id
 * @param callback
 */

StorageRedis.prototype.resetService = function (id, callback) {
  var multi = this.redis.multi();
  multi.del(id + ':' + CURRENT_OUTAGE_KEY_SUFIX);
  multi.del(id + ':' + OUTAGES_KEY_SUFIX);
  multi.del(id + ':' + LATENCY_KEY_SUFIX);
  multi.del(id + ':' + FAILURE_COUNT_SUFIX);
  multi.exec(callback);
};

/**
 * Get all services
 * @param options
 * @param callback
 */

StorageRedis.prototype.getServices = function (options, callback) {
  var self = this;
  this.redis.smembers(SERVICES_KEY_SUFIX, function (err, ids) {
    if (err) {
      return callback(err);
    }

    var multi = self.redis.multi();
    for (var i = 0; i < ids.length; i++) {
      multi.get(SERVICE_KEY_SUFIX + ':' + ids[i]);
    }
    multi.exec(function (err, services) {
      callback(err, services.map(function (service) {
        return JSON.parse(service);
      }));
    });
  });
};


/**
 * Returns current outage if any for a certain service
 * @param service
 * @param callback
 */

StorageRedis.prototype.getCurrentOutage = function (service, callback) {
  this.redis.get(service.id + ':' + CURRENT_OUTAGE_KEY_SUFIX, function (err, data) {
    callback(err, err ? null : JSON.parse(data));
  });
};

/**
 * Records the start of an outage
 * @param service
 * @param callback
 */

StorageRedis.prototype.startOutage = function (service, outageData, callback) {
  this.redis.set(service.id + ':' + CURRENT_OUTAGE_KEY_SUFIX, JSON.stringify(outageData), function (err) {
    callback(err);
  });
};

/**
 * If exists, ends the current outage and saves the details into the outages collection
 * @param service
 * @param callback
 */

StorageRedis.prototype.archiveCurrentOutageIfExists = function (service, callback) {
  var self = this;
  this.getCurrentOutage(service, function (err, outage) {
    if (err) {
      return callback(err);
    }

    if (outage) {
      if (!outage.timestamp) {
        return callback('missing timestamp');
      }
      outage.downtime = +new Date() - outage.timestamp;

      var multi = self.redis.multi();
      // remove current outage
      multi.del(service.id + ':' + CURRENT_OUTAGE_KEY_SUFIX);
      // add to outages ordered set
      multi.zadd(service.id + ':' + OUTAGES_KEY_SUFIX, outage.timestamp, JSON.stringify(outage));
      multi.exec(function (err) {
        callback(err, outage);
      });
    } else {
      callback();
    }
  });
};

/**
 * Get outage history for a service
 * @param service
 * @param timestamp
 * @param callback
 */

StorageRedis.prototype.getServiceOutagesSince = function (service, timestamp, callback) {
  this.redis.zrevrangebyscore(service.id + ':' + OUTAGES_KEY_SUFIX, '+inf', timestamp, function (err, data) {
    callback(err, err ? null : data.map(function (entry) {
      return JSON.parse(entry);
    }));
  });
};

/**
 * Records ping latency
 * @param service
 * @param elapsed
 * @param callback
 */

StorageRedis.prototype.saveLatency = function (service, timestamp, elapsed, callback) {
  //ZADD set_name score member
  //member should be unique, that's why it is timestamp:elapsed
  this.redis.zadd(service.id + ':' + LATENCY_KEY_SUFIX, timestamp, timestamp + ':' + elapsed, callback);
};

/**
 * Get latency since certain time
 * @param service
 * @param timestamp defaults to Infinity
 * @param callback
 */

StorageRedis.prototype.getLatencySince = function (service, timestamp, aggregatedBy, callback) {
  this.redis.zrevrangebyscore(service.id + ':' + LATENCY_KEY_SUFIX, '+inf', timestamp || '-inf', 'withscores', function (err, data) {
    if (err) {
      return callback(err);
    }

    if (!data.length) {
      if (aggregatedBy) {
        return callback(null, {list: [], mean: 0});
      }
      else {
        return callback(null, []);
      }
    }

    var parsedData = parseLatencyDataFromZset(data);
    if (aggregatedBy) {
      aggregator.aggregate(parsedData.arr, aggregatedBy, function (aggregatedData) {
        callback(null, {
          list: aggregatedData,
          mean: parsedData.mean
        });
      });
    } else {
      callback(null, parsedData);
    }
  });
};

StorageRedis.prototype.resetOutageFailureCount = function (service, cb) {
  this.redis.del(service.id + ':' + FAILURE_COUNT_SUFIX, cb);
};

StorageRedis.prototype.increaseOutageFailureCount = function (service, cb) {
  this.redis.incr(service.id + ':' + FAILURE_COUNT_SUFIX, cb);
};

//available on redis
StorageRedis.prototype.flush_database = function (callback) {
  this.redis.flushdb(callback);
};

StorageRedis.prototype.quit = function () {
  this.redis.quit();
};

StorageRedis.prototype.getSessionStore = function (session, options) {
  var RedisStore = connectRedis(session);
  return new RedisStore({
    client: this.redis
  });
};

/**
 * Converts ["124", "1428222697560", "123", "1428222692345"]
 *
 * into
 *
 * [{t: 1428222697560, l: 124}, {t: 1428222692345, l: 123}]
 *
 * @param zset
 * @returns {*}
 */

function parseLatencyDataFromZset(zset) {
  var list = [];
  var currentObj;
  var accLatency = 0;
  for (var i = 0; i < zset.length; i++) {
    if (i % 2 === 0) { // odd
      var lat = zset[i];
      //backward compatibility: only split timestamp:score if found this format
      if (lat.indexOf(':') !== -1){
        lat = lat.split(':')[1];
      }
      currentObj = {l: +lat};
      if (!isNaN(lat) && lat > 0) { // valid positive numbers
        accLatency += (+lat);
      }
    } else {
      currentObj.t = +zset[i]; // timestamp
      list.push(currentObj);
    }
  }
  return {
    arr: list, // TODO: change to list
    mean: Math.round(accLatency / list.length)
  };
}