mbroadst/rethunk

View on GitHub
lib/pool_master.js

Summary

Maintainability
F
4 days
Test Coverage
"use strict";
var util = require('util');
var events = require('events');
var Promise = require('bluebird');
var Dequeue = require('double-ended-queue');
var Pool = require('./pool.js');
var helper = require('./helper.js');
var Err = require('./error.js');
var UNKNOWN_POOLS = 'unknownPools';
var SEPARATOR = 'feedSeparator';

function PoolMaster(r, options) {
  var self = this;

  options = options || {};
  var lineLength = options.buffer || 50;

  self._r = r;
  self._line = new Dequeue(lineLength);
  self._pools = {};
  self._pools[UNKNOWN_POOLS] = []; // pools for which we do not know the server'id
  self._healthyPools = [];
  self._healthy = false;
  self._init = false;
  self._index = 0; // next pool to used
  self._indexUnknown = 0; // next unknown pool to used
  self._discovery = (typeof options.discovery === 'boolean') ? options.discovery : false; // Whether the pool master is in discovery mode or not
  //self._refresh = (typeof options.refresh === 'number') ? options.refresh: 1000*60*60; // Refresh rate for the list of servers
  self._options = options;
  self._options.buffer = options.buffer || 50;
  self._options.max = options.max || 1000;
  self._log = helper.createLogger(self, options.silent || false);
  self._draining = false;
  self._numConnections = 0;
  self._numAvailableConnections = 0;
  self._hasPrintWarningLocalhost = false;
  self._feed = null;
  self._consecutiveFails = -1;
  self._timeoutError = options.timeoutError || 1000; // How long should we wait before recreating a connection that failed?
  self._maxExponent = options.maxExponent || 6; // Maximum timeout is 2^maxExponent*timeoutError

  //TODO
  //self._usingPool = true; // If we have used the pool
  self._seed = 0;

  var pool;
  if (Array.isArray(options.servers)) {
    if (options.servers.length > 0) {
      self._servers = options.servers;
      for(var i=0; i<options.servers.length; i++) {
        var settings = self.createPoolSettings(options, options.servers[i], self._log);
        pool = new Pool(self._r, settings);
        self._pools[UNKNOWN_POOLS].push(pool);
        // A pool is considered healthy by default such that people can do
        // var = require(...)(); query.run();
        self._healthyPools.push(pool);
        self.emitStatus();
      }
    }
    else {
      throw new Err.ReqlDriverError('If `servers` is an array, it must contain at least one server');
    }
  } else {
    self._servers = [{
      host: options.host || 'localhost',
      port: options.port || 28015
    }];

    pool = new Pool(self._r, self.createPoolSettings(options, {}, self._log));
    self._pools[UNKNOWN_POOLS].push(pool);
    self._healthyPools.push(pool);
    self.emitStatus();
  }

  // Initialize all the pools - bind listeners
  self._pools[UNKNOWN_POOLS].forEach(function(pool) { self.initPool(pool); });

  if ((self._discovery === true)) {
    self._timeout = setTimeout(function() { self.fetchServers(); }, 0);
  }
}
util.inherits(PoolMaster, events.EventEmitter);

PoolMaster.prototype.getPools = function() {
  var result = [];
  helper.loopKeys(this._pools, function(pools, key) {
    if (key === UNKNOWN_POOLS) {
      for (var i = 0; i < pools[key].length; i++) {
        result.push(pools[key][i]);
      }
    }
    else {
      result.push(pools[key]);
    }
  });
  return result;
};

// Reject all promises in this._line
PoolMaster.prototype._flushErrors = function() {
  while (this._line.length > 0) {
    this._line.shift().reject(new Err.ReqlDriverError('None of the pools have an opened connection and failed to open a new one'));
    this.emit('queueing', this._line.length);
  }
};

PoolMaster.prototype.getConnection = function() {
  var self = this;
  // Find a pool with available connections
  var result;
  for (var i = 0; i < self._healthyPools.length; i++) {
    if (self._index >= self._healthyPools.length) {
      self._index = 0;
    }
    if (self._healthyPools[self._index].getAvailableLength() > 0) {
      result = self._healthyPools[self._index].getConnection();
    }
    self._index++;
    if (self._index === self._healthyPools.length) {
      self._index = 0;
    }
    if (result) {
      return result;
    }
  }
  if (self._healthyPools.length === 0) {
    return new Promise(function(resolve, reject) {
      reject(new Err.ReqlDriverError('None of the pools have an opened connection and failed to open a new one'));
    });
  }
  else {
    // All pool are busy, buffer the request
    return new Promise(function(resolve, reject) {
      self._line.push({
        resolve: resolve,
        reject: reject
      });

      self.emit('queueing', self._line.length);
      // We could add a condition to be less greedy (for early start)
      self._expandAll();
    });

  }
};
PoolMaster.prototype._expandAll = function() {
  for (var i = 0; i < this._healthyPools.length; i++) {
    this._healthyPools[i]._expandBuffer();
  }
};

// Fetch all the servers once
PoolMaster.prototype.handleAllServersResponse = function(servers) {
  var self = this;
  if (self._draining === true) return;

  // Fill all the known server from RethinkDB
  var knownServer = {};
  for (var i = 0; i < servers.length; i++) {
    var server = servers[i];
    knownServer[server.id] = {count: 0, server: server};
    if (self._pools[server.id] === undefined) {
      // We potentially have a new server in the cluster, or we already have a pool for this server
      // in one of the UNKNOWN_POOLS
      var found = false;
      for (var j = 0; j < self._pools[UNKNOWN_POOLS].length; j++) {
        if (found) break;
        var pool = self._pools[UNKNOWN_POOLS][j];
        // If a pool is created with localhost, it will probably match the first server even though it may not the the one
        // So it gets an id
        for (var k = 0; k < server.network.canonical_addresses.length; k++) {
          // Check for the same host (or if they are both localhost) and port
          if (((server.network.canonical_addresses[k].host === pool.options.connection.host) ||
               (server.network.hostname === pool.options.connection.host) ||
              (helper.localhostAliases.hasOwnProperty(server.network.canonical_addresses[k].host) &&
              helper.localhostAliases.hasOwnProperty(pool.options.connection.host))) &&
              (server.network.reql_port === pool.options.connection.port)) {

            self._pools[server.id] = self._pools[UNKNOWN_POOLS].splice(j, 1)[0];
            // We may assign the wrong pool to this server if it's maching on localhost
            if (helper.localhostAliases.hasOwnProperty(server.network.canonical_addresses[k].host)) {
              self._pools[server.id].options.connection.host = helper.getCanonicalAddress(server.network.canonical_addresses).host;
              self._pools[server.id].drainLocalhost();
            }
            found = true;
            break;
          }
        }
      }
      if (found === false) {
        // We just found a new server, let's extract the canonical address and connect to it
        self.createPool(server);
      }
    }
  } // Each server know has a pool

  // Check if we need to remove pools
  helper.loopKeys(self._pools, function(pools, key) { // among the pools with a server id
    if (key !== UNKNOWN_POOLS) {
      if (knownServer.hasOwnProperty(key) === false) {
        self.deletePool(key); // We just found a pool that doesn't map to any known RethinkDB server
      }
      else {
        knownServer[key].count++;
      }
    }
  });

  /* jshint ignore:start */
  for (var z = 0; i < self._pools[UNKNOWN_POOLS].length; z++) {
    // These pools does not match any server returned by RethinkDB.
    var poolToRemove = self._pools[UNKNOWN_POOLS].splice(z, 1)[0];
    self._log('Removing pool connected to: ' + poolToRemove.getAddress());
    poolToRemove.drain()
      .then(function() { pool.removeAllListeners(); })
      .error(function(error) {
        self._log('Pool connected to: ' + self._pools[UNKNOWN_POOLS][z].getAddress() + ' could not be properly drained.');
        self._log(error.message);
        self._log(error.stack);
      });
  }
  /* jshint ignore:end */
};

// Create the settings for a given pool. Merge the global options + the servers's one.
PoolMaster.prototype.createPoolSettings = function(globalOptions, serverOptions, log) {
  var settings = {};
  var numServers = Array.isArray(globalOptions.servers) ? globalOptions.servers.length : 1;
  helper.loopKeys(globalOptions, function(options, key) {
    if ((key === 'buffer') || (key === 'max')) {
      settings[key] = Math.ceil(options[key] / numServers);
      settings[key] = Math.ceil(options[key] / numServers);
    }
    else if (key !== 'servers') {
      settings[key] = options[key];
    }
  });
  if (serverOptions) {
    helper.loopKeys(serverOptions, function(options, key) {
      settings[key] = options[key];
    });
  }
  settings._log = log;
  return settings;
};

// Create a new pool
PoolMaster.prototype.createPool = function(server) {
  var self = this;
  var address = helper.getCanonicalAddress(server.network.canonical_addresses);
  var settings = self.createPoolSettings(self._options, {
    port: server.network.reql_port,
    host: address.host
  }, self._log);
  var pool = new Pool(self._r, settings);
  self._pools[server.id] = pool;
  self.initPool(pool);
  self._healthyPools.push(pool);
  self.emitStatus();
  self.resetBufferParameters();
};

// Delete a known pool
PoolMaster.prototype.deletePool = function(key) {
  var self = this;
  var pool = self._pools[key];
  self._log('Removing pool connected to: ' + pool.getAddress());
  pool.drain().then(function() {
    pool.removeAllListeners();
  }).error(function(error) {
    self._log('Pool connected to: ' + self._pools[key].getAddress() + ' could not be properly drained.');
    self._log(error.message);
    self._log(error.stack);
  });
  delete self._pools[key];
  self.resetBufferParameters();
};

//  Create the feed on server_status and bind the listener to the feed
PoolMaster.prototype.fetchServers = function(useSeeds) {
  var self = this;
  var query = self._r.db('rethinkdb').table('server_status')
      .union([SEPARATOR])
      .union(self._r.db('rethinkdb').table('server_status').changes());

  // In case useSeeds is true, we rotate through all the seeds + the pool master
  var promise;
  if (!useSeeds || self._seed === self._servers.length) {
    if (useSeeds && self._seed === self._servers.length) {
      // We increase the back off only when we went through all the seeds
      self._consecutiveFails++;
    }

    self._seed = 0;
    promise = query.run({cursor: true});
  }
  else {
    var settings = self._servers[self._seed];
    self._seed++;
    promise = self._r.connect(settings).then(function(connection) {
      return query.run(connection, {cursor: true});
    });
  }

  promise.then(function(feed) {
    if (self._draining === true) return feed.close();

    // otherwise
    self._feed = feed;
    var initializing = true;
    var servers = [];
    feed.each(function(err, change) {
      if (err) {
        self._log('The changefeed on server_status returned an error: ' + err.toString());
        // We have to refetch everything as the server that was serving the feed may
        // have died.
        if (!self._draining) {
          setTimeout(function() {
            self.fetchServers();
          }, 0); // Give a timeout to let the driver clean the pools
        }
        return;
      }
      if (initializing === true) {
        if (change === SEPARATOR) {
          initializing = false;
          self.handleAllServersResponse(servers);
          // Rerun the whole query after to make sure that a change did not skip/sneak between the union. As long
          // as RethinkDB does not provide initial results
          setTimeout(function() {
            self._r.db('rethinkdb').table('server_status').run({cursor: false}).then(function(servers) {
              self.handleAllServersResponse(servers);
            }).error(function(error) {
              self._log('Fail to retrieve a second copy of server_status');
              //TODO Retry
            });
          }, 1000);
        }
        else {
          servers.push(change);
        }
        return;
      }

      if (change.new_val !== null && change.old_val === null) {
        // New server
        self.createPool(change.new_val);
      }
      else if (change.new_val === null && change.old_val !== null) {
        // A server was removed
        var server = change.old_val;
        if (self._pools[server.id] !== null) {
          self.deletePool(server.id);
        }
        else {
          var found = false;
          for (var i = 0; i < self._pools[UNKNOWN_POOLS].length; i++) {
            if (((server.network.canonical_addresses[i].host === self._pools[UNKNOWN_POOLS][i].options.connection.host) ||
                (helper.localhostAliases.hasOwnProperty(server.network.canonical_addresses[i].host) && (helper.localhostAliases.hasOwnProperty(self._pools[UNKNOWN_POOLS][i].options.connection.host)))) &&
                (server.network.reql_port === self._pools[UNKNOWN_POOLS][i].options.connection.port)) {
              found = true;

              (function(pool) { // jshint ignore:line
                self._log('Removing pool connected to: ' + pool.getAddress());
                var poolToRemove = self._pools[UNKNOWN_POOLS].splice(i, 1)[0];
                poolToRemove.drain()
                  .then(function() { poolToRemove.removeAllListeners(); })
                  .error(function(error) {
                    if (self._options.silent !== true) {
                      self._log('Pool connected to: ' + poolToRemove.getAddress() + ' could not be properly drained.');
                      self._log(error.message);
                      self._log(error.stack);
                    }
                  });
              })(self._pools[UNKNOWN_POOLS][i]);  // jshint ignore:line
              break;
            }
          }

          if (found === false) {
            self._log('A server was removed but no pool for this server exists...');
          }
        }
      }
      // We ignore this change since this it doesn't affect whether the server
      // is available or not.
      // else if (change.new_val !== null && change.old_val !== null) {}
    });
  }).error(function(error) {
    self._log('Could not retrieve the data from server_status: ' + JSON.stringify(error));

    var timeout;
    if (self._consecutiveFails === -1) {
      timeout = 0;
    }
    else {
      timeout = (1 << Math.min(self._maxExponent, self._consecutiveFails)) * self._timeoutError;
    }
    setTimeout(function() {
      self.fetchServers(true);
    }, timeout);
  });
};

// Bind listeners on the pools
PoolMaster.prototype.initPool = function(pool) {
  var self = this;

  pool.on('size-diff', function(diff) {
    self._numConnections += diff;
    self.emit('size', self._numConnections);
  });
  pool.on('available-size-diff', function(diff) {
    self._numAvailableConnections += diff;
    self.emit('available-size', self._numAvailableConnections);
  });

  pool.on('new-connection', function() {
    if (self._line.length > 0) {
      var p = self._line.shift();
      this.getConnection().then(p.resolve).error(p.reject);
      self.emit('queueing', self._line.length);
    }
  });
  pool.on('not-empty', function() {
    if (self._draining === false) {
      var found = false;
      for (var i = 0; i < self._healthyPools.length; i++) {
        if (self._healthyPools[i] === this) {
          self._healthyPools.length;
          found = true;
          break;
        }
      }
      if (found === false) {
        self._healthyPools.push(this);
        self.emitStatus();
        self.resetBufferParameters();
      }
    }
  });
  pool.on('empty', function() {
    // A pool that become empty is considered unhealthy
    for (var i = 0; i < self._healthyPools.length; i++) {
      if (self._healthyPools[i] === this) {
        self._healthyPools.splice(i, 1);
        self.emitStatus();
        break;
      }
    }
    if (self._healthyPools.length === 0) {
      self._flushErrors();
    }

    self.resetBufferParameters();
  });
  pool.on('draining', function() {
    for (var i = 0; i < self._healthyPools.length; i++) {
      if (self._healthyPools[i] === this) {
        self._healthyPools.splice(i, 1);
        self.emitStatus();
        break;
      }
    }

    if (self._healthyPools === 0) {
      self._flushErrors();
    }
  });
};

PoolMaster.prototype.getNumConnections = function() {
  var sum = 0;
  for (var i = 0; i < this._healthyPools.length; i++) {
    sum += this._healthyPools[i].getLength();
  }
  return sum;
};
PoolMaster.prototype.getNumAvailableConnections = function() {
  var sum = 0;
  for (var i = 0; i < this._healthyPools.length; i++) {
    sum += this._healthyPools[i].getAvailableLength();
  }
  return sum;
};

// Reset buffer and max for each pool
PoolMaster.prototype.resetBufferParameters = function() {
  var max = Math.floor(this._options.max / this._healthyPools.length);
  var buffer = Math.floor(this._options.buffer / this._healthyPools.length);
  for (var i = 0; i < this._healthyPools.length; i++) {
    if (this._healthyPools[i].getLength() > max) {
      this._healthyPools[i]._extraConnections = this._healthyPools[i].getLength() - max;
    }
    else {
      this._healthyPools[i]._extraConnections = 0;
    }
    this._healthyPools[i].options.max = max;
    this._healthyPools[i].options.buffer = buffer;
  }
};

PoolMaster.prototype.getLength = function() {
  return this._numConnections;
};
PoolMaster.prototype.getAvailableLength = function() {
  return this._numAvailableConnections;
};

PoolMaster.prototype.drain = function() {
  this.emit('draining');
  if (this._discovery === true) {
    this._discovery = false;
    if (!!this._feed) this._feed.close();
  }

  this._draining = true;
  var promises = [];
  var pools = this.getPools();
  for (var i = 0; i < pools.length; i++) {
    promises.push(pools[i].drain());
  }

  this._healthyPools = [];
  var self = this;
  return Promise.all(promises)
    .then(function() {
      for (var i = 0; i < pools.length; i++)
        pools[i].removeAllListeners();
    })
    .error(function(error) {
      if (self._options.silent !== true) {
        self._log('Failed to drain all the pools:');
        self._log(error.message);
        self._log(error.stack);
      }
    });
};

// Emit the healthy event with a boolean indicating whether the pool master
// is healthy or not
PoolMaster.prototype.emitStatus = function() {
  var healthy = this._healthyPools.length !== 0;
  if (this._healthy !== healthy) {
    this._healthy = healthy;
    this.emit('healthy', healthy);
  }
};

module.exports = PoolMaster;