mbroadst/rethunk

View on GitHub
long_test/discovery.js

Summary

Maintainability
F
6 days
Test Coverage
var config = require(__dirname+'/../test/config.js');
var longConfig = require(__dirname+'/config.js');

var assert = require('assert');
var util = require(__dirname+'/../test/util/common.js');
var uuid = util.uuid;
var It = util.It;

var spawn = require('child_process').spawn
var cmd = longConfig.cmd;
var host = config.host;

var INITIAL_OFFSET = longConfig.initialOffset;
var CLUSTER_PORT = 29015
var DRIVER_PORT = 28015;
var DATA_DIR = longConfig.dataDir;
var BUFFER = longConfig.buffer;
var MAX = longConfig.max;

var FEED_QUERY = longConfig.feedQuery;
var NUM_SERVERS = longConfig.numServers;

It('Test that pools are created and identified with discovery: true', function* (done) {
  console.log('');
  console.log('++ Starting servers');
  var servers = [];
  for(var portOffset=INITIAL_OFFSET; portOffset<INITIAL_OFFSET+NUM_SERVERS; portOffset++) {
    var child = spawn(cmd, [
        '--port-offset',  portOffset,
        '--directory', DATA_DIR+portOffset,
        '--server-name',  'rethinkdbdash'+portOffset,
        '--bind', 'all',
        '--join', 'localhost:'+(CLUSTER_PORT+INITIAL_OFFSET)
    ])
    //child.stdout.on('data', function(x) { console.log(x.toString())});
    //child.stderr.on('data', function(x) { console.log(x.toString())});
    servers.push(child);
  }

  // Give 2 seconds for the servers to start
  yield util.sleep(2000);
  console.log('++ Starting rethinkdbdash');
  var r = require(__dirname+'/../lib')({
    host: host,
    port: DRIVER_PORT+INITIAL_OFFSET,
    discovery: true,
    max: MAX,
    buffer: BUFFER,
  });

  // Make sure we have enough healthy pools, try and retest every second for at most 5 seconds.
  var wait = 0;
  var extra = 1000;
  var maxWait = 5000;
  var pass = false;
  while (pass === false && wait < maxWait) {
    yield util.sleep(extra);
    try {
      // Expect NUM_SERVERS different pools
      assert.equal(r.getPoolMaster()._healthyPools.length, NUM_SERVERS);
      // Expect NUM_SERVERS known pools
      assert.equal(Object.keys(r.getPoolMaster()._pools).length, NUM_SERVERS+1); // +1 for UNKNOWN_POOLS
      pass = true;
    }
    catch(err) {
      wait += extra;
    }
  }
  if (pass === false) {
    done(new Error('Could not find '+NUM_SERVERS+' healthy pools.'));
  }

  // Test the parameters for each pool
  for(var i=0; i<r.getPoolMaster()._healthyPools.length; i++) {
    assert.equal(r.getPool(i).options.buffer, Math.ceil(BUFFER/NUM_SERVERS));
    assert.equal(r.getPool(i).options.max, Math.ceil(MAX/NUM_SERVERS));
  }

  // Time to close the extra connection;
  yield util.sleep(1000);
  for(var i=0; i<r.getPoolMaster()._healthyPools.length; i++) {
    // We have one change feed opened somewhere
    if (r.getPool(i).getAvailableLength() < Math.ceil(BUFFER/NUM_SERVERS)) {
      done(new Error('Not enough available connections'))
    }
  }

  // Assert that a changefeed on table_status exists
  var queries = yield r.db('rethinkdb').table('jobs')('info')('query').run();
  var found = false;
  for(var i=0; i<queries.length; i++) {
    if (queries[i] === FEED_QUERY) {
      found = true;
      break;
    }
  }
  assert(found, 'Feed opened');

  // Kill one server, test, then restart it
  for(var portOffset=INITIAL_OFFSET; portOffset<INITIAL_OFFSET+NUM_SERVERS; portOffset++) {
    var server = servers.shift();
    console.log('++ Killing a server');
    server.kill();

    var wait = 0;
    var extra = 1000;
    var maxWait = 5000;
    var pass = false;
    while (pass === false && wait < maxWait) {
      yield util.sleep(extra);
      try {
        // Expect NUM_SERVERS-1 different pools
        assert.equal(r.getPoolMaster()._healthyPools.length, NUM_SERVERS-1);
        // Expect NUM_SERVERS known pools
        assert.equal(Object.keys(r.getPoolMaster()._pools).length, NUM_SERVERS-1+1); // +1 for UNKNOWN_POOLS
        pass = true;
      }
      catch(err) {
        wait += extra;
      }
    }
    if (pass === false) {
      done(new Error('Could not find '+NUM_SERVERS+' healthy pools.'));
    }

    // Test the parameters for each pool
    for(var i=0; i<r.getPoolMaster()._healthyPools.length; i++) {
      assert.equal(r.getPool(i).options.buffer, Math.ceil(BUFFER/(NUM_SERVERS-1)));
      assert.equal(r.getPool(i).options.max, Math.ceil(MAX/(NUM_SERVERS-1)));
    }

    var found = false;
    var queries = yield r.db('rethinkdb').table('jobs')('info')('query').run();
    for(var i=0; i<queries.length; i++) {
      if (queries[i] === FEED_QUERY) {
        found = true;
        break;
      }
    }
    assert(found, 'Feed opened');

    console.log('++ Restarting a server');
    var child = spawn(cmd, [
        '--port-offset',  portOffset,
        '--directory', DATA_DIR+portOffset,
        '--server-name',  'rethinkdbdash'+portOffset,
        '--bind', 'all',
        '--join', 'localhost:'+(CLUSTER_PORT+INITIAL_OFFSET)
    ])
    //child.stdout.on('data', function(x) { console.log(x.toString())});
    //child.stderr.on('data', function(x) { console.log(x.toString())});
    servers.push(child);

    var wait = 0;
    var extra = 1000;
    var maxWait = 5000;
    var pass = false;
    while (pass === false && wait < maxWait) {
      yield util.sleep(extra);
      try {
        // Expect NUM_SERVERS different pools
        assert.equal(r.getPoolMaster()._healthyPools.length, NUM_SERVERS);
        // Expect NUM_SERVERS known pools
        assert.equal(Object.keys(r.getPoolMaster()._pools).length, NUM_SERVERS+1); // +1 for UNKNOWN_POOLS
        pass = true;
      }
      catch(err) {
        wait += extra;
      }
    }
    if (pass === false) {
      done(new Error('Could not find '+NUM_SERVERS+' healthy pools.'));
    }

    // Test the parameters for each pool
    for(var i=0; i<r.getPoolMaster()._healthyPools.length; i++) {
      assert.equal(r.getPool(i).options.buffer, Math.ceil(BUFFER/(NUM_SERVERS)));
      assert.equal(r.getPool(i).options.max, Math.ceil(MAX/(NUM_SERVERS)));
    }
  }

  // Add a new server
  console.log('++ Adding a new server');
  var portOffset = NUM_SERVERS+1;
  var child = spawn(cmd, [
      '--port-offset',  INITIAL_OFFSET+portOffset,
      '--directory', DATA_DIR+portOffset,
      '--server-name',  'rethinkdbdash'+portOffset,
        '--bind', 'all',
      '--join', 'localhost:'+(CLUSTER_PORT+INITIAL_OFFSET)
  ])
  //child.stdout.on('data', function(x) { console.log(x.toString())});
  //child.stderr.on('data', function(x) { console.log(x.toString())});
  servers.push(child);


  var wait = 0;
  var extra = 1000;
  var maxWait = 5000;
  var pass = false;
  while (pass === false && wait < maxWait) {
    yield util.sleep(extra);
    // Expect NUM_SERVERS+1 different pools
    try {
      assert.equal(r.getPoolMaster()._healthyPools.length, NUM_SERVERS+1);
      // Expect NUM_SERVERS+1 known pools
      assert.equal(Object.keys(r.getPoolMaster()._pools).length, NUM_SERVERS+1+1); // +1 for UNKNOWN_POOLS

      pass = true;
    }
    catch(err) {
      wait += extra;
    }
  }
  if (pass === false) {
    done(new Error('Could not find '+NUM_SERVERS+' healthy pools.'));
  }

  // Test the parameters for each pool
  for(var i=0; i<r.getPoolMaster()._healthyPools.length; i++) {
    assert.equal(r.getPool(i).options.buffer, Math.ceil(BUFFER/(NUM_SERVERS+1)));
    assert.equal(r.getPool(i).options.max, Math.ceil(MAX/(NUM_SERVERS+1)));
  }

  console.log('++ Removing the extra server');
  var server = servers.pop();
  server.kill();

  var wait = 0;
  var extra = 1000;
  var maxWait = 5000;
  var pass = false;
  while (pass === false && wait < maxWait) {
    yield util.sleep(extra);
    // Expect NUM_SERVERS different pools
    try {
      assert.equal(r.getPoolMaster()._healthyPools.length, NUM_SERVERS);
      // Expect NUM_SERVERS known pools
      assert.equal(Object.keys(r.getPoolMaster()._pools).length, NUM_SERVERS+1); // +1 for UNKNOWN_POOLS
      pass = true;
    }
    catch(err) {
      wait += extra;
    }
  }
  if (pass === false) {
    done(new Error('Could not find '+NUM_SERVERS+' healthy pools.'));
  }

 
  // Kill all servers except the last one
  console.log('++ Removing all the servers except the last one');
  while (servers.length > 1) {
    var server = servers.shift();
    server.kill();
  }

  var wait = 0;
  var extra = 1000;
  var maxWait = 5000;
  var pass = false;
  while (pass === false && wait < maxWait) {
    yield util.sleep(extra);
    try {
      // Expect 1 different pools
      assert.equal(r.getPoolMaster()._healthyPools.length, 1);
      // Expect 1 known pools
      assert.equal(Object.keys(r.getPoolMaster()._pools).length, 1+1); // +1 for UNKNOWN_POOLS
      pass = true;
    }
    catch(err) {
      wait += extra;
    }
  }
  if (pass === false) {
    done(new Error('Could not find '+NUM_SERVERS+' healthy pools.'));
  }

  // Test the parameters for each pool
  for(var i=0; i<r.getPoolMaster()._healthyPools.length; i++) {
    assert.equal(r.getPool(i).options.buffer, Math.ceil(BUFFER/(1)));
    assert.equal(r.getPool(i).options.max, Math.ceil(MAX/(1)));
  }


  console.log('++ Removing the last server');
  // Kill the last server
  var server = servers.pop();
  server.kill();

  var wait = 0;
  var extra = 1000;
  var maxWait = 5000;
  var pass = false;
  while (pass === false && wait < maxWait) {
    yield util.sleep(extra);
    try {
      // Expect 0 healthy pools
      assert.equal(r.getPoolMaster()._healthyPools.length, 0);

      // Expect 1 known pools
      // In discovery mode, when the last pool dies, we don't delete it as we may use it to seed
      // things again
      assert.equal(Object.keys(r.getPoolMaster()._pools).length, 1+1); // +1 for UNKNOWN_POOLS
      pass = true;
    }
    catch(err) {
      wait += extra;
    }
  }
  if (pass === false) {
    done(new Error('Could not find '+NUM_SERVERS+' healthy pools.'));
  }

  // Restart all the servers except the last one
  console.log('++ Restart all the servers except the last one (includes the seed)');
  for(var portOffset=INITIAL_OFFSET; portOffset<INITIAL_OFFSET+NUM_SERVERS-1; portOffset++) {
    var child = spawn(cmd, [
        '--port-offset',  portOffset,
        '--directory', DATA_DIR+portOffset,
        '--server-name',  'rethinkdbdash'+portOffset,
        '--bind', 'all',
        '--join', 'localhost:'+(CLUSTER_PORT+INITIAL_OFFSET)
    ])
    //child.stdout.on('data', function(x) { console.log(x.toString())});
    //child.stderr.on('data', function(x) { console.log(x.toString())});
    servers.push(child);
  }

  // Make sure we have enough healthy pools, try and retest every second for at most 5 seconds.
  var wait = 0;
  var extra = 1000;
  var maxWait = 5000;
  var pass = false;
  while (pass === false && wait < maxWait) {
    yield util.sleep(extra);
    try {
      // Expect NUM_SERVERS different pools
      assert.equal(r.getPoolMaster()._healthyPools.length, NUM_SERVERS-1);
      // Expect NUM_SERVERS-1 known pools
      assert.equal(Object.keys(r.getPoolMaster()._pools).length, NUM_SERVERS-1+1); // +1 for UNKNOWN_POOLS
      pass = true;
    }
    catch(err) {
      wait += extra;
    }
  }
  if (pass === false) {
    done(new Error('Could not find '+NUM_SERVERS+' healthy pools.'));
  }

  // Test the parameters for each pool
  for(var i=0; i<r.getPoolMaster()._healthyPools.length; i++) {
    assert.equal(r.getPool(i).options.buffer, Math.ceil(BUFFER/(NUM_SERVERS-1)));
    assert.equal(r.getPool(i).options.max, Math.ceil(MAX/(NUM_SERVERS-1)));
  }

  // Restart the last server
  var portOffset = INITIAL_OFFSET+NUM_SERVERS-1;
  var child = spawn(cmd, [
      '--port-offset',  portOffset,
      '--directory', DATA_DIR+portOffset,
      '--server-name',  'rethinkdbdash'+portOffset,
      '--bind', 'all',
      '--join', 'localhost:'+(CLUSTER_PORT+INITIAL_OFFSET)
  ])
  //child.stdout.on('data', function(x) { console.log(x.toString())});
  //child.stderr.on('data', function(x) { console.log(x.toString())});
  servers.push(child);

  var wait = 0;
  var extra = 1000;
  var maxWait = 5000;
  var pass = false;
  while (pass === false && wait < maxWait) {
    yield util.sleep(extra);
    try {
      // Expect NUM_SERVERS different pools
      assert.equal(r.getPoolMaster()._healthyPools.length, NUM_SERVERS);
      // Expect NUM_SERVERS-1 known pools
      assert.equal(Object.keys(r.getPoolMaster()._pools).length, NUM_SERVERS+1); // +1 for UNKNOWN_POOLS
      pass = true;
    }
    catch(err) {
      wait += extra;
    }
  }
  if (pass === false) {
    done(new Error('Could not find '+NUM_SERVERS+' healthy pools.'));
  }

  // Test the parameters for each pool
  for(var i=0; i<r.getPoolMaster()._healthyPools.length; i++) {
    assert.equal(r.getPool(i).options.buffer, Math.ceil(BUFFER/(NUM_SERVERS)));
    assert.equal(r.getPool(i).options.max, Math.ceil(MAX/(NUM_SERVERS)));
  }

  // Fill the pools, and tests that we use as many connections as allowed
  for(var i=0; i<MAX+1; i++) {
    // These queries take 2 seconds to fail
    r.js('while (true) {}', {timeout: 2}).run().then(function() {}).error(function() {});
  }
  for(var i=0; i<r.getPoolMaster()._healthyPools.length; i++) {
    assert.equal(r.getPool(i).getLength(), Math.ceil(MAX/(NUM_SERVERS)));
  }

  yield r.getPoolMaster().drain();
  for(var i=0; i<servers.length; i++) {
    servers[i].kill();
  }
  yield util.sleep(2000);
  done();
});