mauritsl/node-castor-client

View on GitHub
classes/Transport.js

Summary

Maintainability
B
4 hrs
Test Coverage
/* jshint -W097 */
"use strict";

var net = require('net');
var Q = require('q');
var Rows = require('./Rows');

/**
 * Transport class
 */
var Transport = function(host, readyCallback) {
  var self = this;
  
  // Used for validating the connection in the connection pool.
  this.ready = false;
  
  // Store incoming traffic in the input buffer.
  this._inputBuffer = new Buffer(0);
  
  // Promises for application requests.
  this._promises = {};
  
  // Current stream number.
  this._streamNumber = 0;
  
  // Queue for new requests waiting for stream number.
  this._streamQueue = [];
  
  // Validate hostname and extract port number (IPv4 or IPv6).
  var port = 9042;
  var parts = host.match(/^(?:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)|\[?([0-9a-z\:]+)\]?)(?:\:([0-9]+))?$/);
  if (parts) {
    host = parts[1] === undefined ? parts[2] : parts[1];
    port = parts[3] === undefined ? port : parseInt(parts[3]);
  }
  else {
    throw new Error("Invalid host or port number");
  }
  
  this._host = host;
  this._port = port;
  this._readyCallback = readyCallback;
  this._lastConnectAttempt = 0;
  this.connect();
};

Transport.prototype.connect = function() {
  var self = this;
  
  if (new Date() - this._lastConnectAttempt < 5000) {
    return;
  }
  this._lastConnectAttempt = new Date();
  
  // Build options for net.connect().
  var options = {
    host: this._host,
    port: this._port,
    allowHalfOpen: true
  };
  
  // Open connection and add event listeners.
  this._socket = net.connect(options).on('connect', function() {
    // Disable Nagle.
    self._socket.setNoDelay(true);
    // Send startup frame.
    self._startup().then(function() {
      self.ready = true;
      if (self._readyCallback !== undefined) {
        self._readyCallback();
      }
    }).fail(function(error) {
      self.destroy();
      throw error;
    });
  }).on('close', function(had_error) {
    // Test this case with: tcpkill -i eth0 port 9042
    setTimeout(function() {
      self.connect();
    }, 5000);
  }).on('data', function(data) {
    self._inputBuffer = Buffer.concat([self._inputBuffer, new Buffer(data)]);
    self._fetchFrames();
  }).on('error', function(error) {
    if (error.code === 'ECONNRESET') {
      // Cassandra sometimes resets idle connections. We simple destroy this
      // transport. This will fail open queries (if any) and automatically
      // trigger the connection pool to open a new connection.
      self.destroy();
    }
    else {
      if (self._socket.destroyed) {
        self.connect();
      }
      throw error;
    }
  });
};

// Define constants.
Transport.ERROR = 0x00;
Transport.STARTUP = 0x01;
Transport.READY = 0x02;
Transport.AUTHENTICATE = 0x03;
Transport.CREDENTIALS = 0x04;
Transport.OPTIONS = 0x05;
Transport.SUPPORTED = 0x06;
Transport.QUERY = 0x07;
Transport.RESULT = 0x08;
Transport.PREPARE = 0x09;
Transport.EXECUTE = 0x0A;
Transport.REGISTER = 0x0B;
Transport.EVENT = 0x0C;

/**
 * Send the startup frame.
 */
Transport.prototype._startup = function() {
  // Create frame body, which is a string map with CQL version.
  var body = new Buffer('....CQL_VERSION..3.0.0');
  body.writeUInt16BE(1, 0);
  body.writeUInt16BE(11, 2);
  body.writeUInt16BE(5, 15);
  
  return this.sendFrame(Transport.STARTUP, body);
};

/**
 * Close connection.
 */
Transport.prototype.destroy = function() {
  var self = this;
  var reconnectInterval = this.ready ? 0 : 5000;
  this.ready = false;
  this._socket.end();
  Object.keys(this._promises).forEach(function(stream) {
    if (typeof self._promises[stream] === 'object') {
      self._promises[stream].reject('Connection closed');
    }
  });
  this._streamQueue.forEach(function(promise) {
    promise.reject('Connection closed');
  });
  setTimeout(function() {
    self.connect();
  }, reconnectInterval);
};

/**
 * Send a frame.
 * 
 * Returns a promise which will be resolved by _fetchFrame when we got a
 * response from the server.
 */
Transport.prototype.sendFrame = function(opcode, body) {
  var self = this;
  var defer = Q.defer();
  this._getStream().then(function(stream) {
    if (typeof opcode === 'undefined') {
      throw new Error('Missing opcode');
    }
    
    var frame = new Buffer(8 + body.length);
    
    // Write frame header.
    frame.writeInt8(0x01, 0); // Version
    frame.writeInt8(0x00, 1); // Flags
    frame.writeInt8(stream, 2);
    frame.writeInt8(opcode, 3);
    frame.writeUInt32BE(body.length, 4);
    
    new Buffer(body).copy(frame, 8);
    self._promises[stream] = defer;
    self._socket.write(frame);
  }).fail(function(error) {
    defer.reject(error);
  });
  return defer.promise;
};

/**
 * Fetch frames from the input buffer.
 * Do nothing if the input buffer doesn't contain enough data.
 */
Transport.prototype._fetchFrames = function() {
  // We need at least 8 bytes for the frame header.
  while (this._inputBuffer.length >= 8) {
    var length = 8 + this._inputBuffer.readUInt32BE(4);
    if (this._inputBuffer.length < length) {
      // This frame is incomplete. This function will be called again when
      // the remainding data comes in.
      return;
    }
    
    var frame = new Buffer(length);
    this._inputBuffer.copy(frame, 0, 0, length);
    
    // Shift off first frame and leave remaining data in input buffer.
    var remaining = new Buffer(this._inputBuffer.length - length);
    this._inputBuffer.copy(remaining, 0, length);
    this._inputBuffer = remaining;
    var stream = frame.readInt8(2);
    var opcode = frame.readUInt8(3);
    var body = new Buffer(length - 8);
    frame.copy(body, 0, 8);
    frame = undefined; // Free mem.
    if (typeof this._promises[stream] !== 'undefined') {
      if (opcode == Transport.ERROR) {
        this._promises[stream].reject(new TransportError(body));
      }
      else {
        this._promises[stream].resolve(body);
      }
      this._releaseStream(stream);
    }
    else {
      // @todo: Handle messages initiated by server
      console.log('unknown stream: ' + stream);
    }
  }
};

Transport.prototype._getStream = function() {
  var defer = new Q.defer();
  if (typeof this._promises[this._streamNumber] === 'undefined') {
    // The sendFrame-function will set this variable, but not directly.
    // Set it to null to indicate that this slot is reserved.
    this._promises[this._streamNumber] = null;
    defer.resolve(this._streamNumber);
  }
  else {
    this._streamQueue.push(defer);
  }
  // Maximum number of concurrent queries. Must be between 0x01 and 0x80.
  // It seems to run smoother at around 0x20 - 0x40 than 0x80. 
  var concurrency = 0x20;
  this._streamNumber = (this._streamNumber + 1) % concurrency;
  return defer.promise;
};

Transport.prototype._releaseStream = function(stream) {
  this._promises[stream] = undefined;
  var queued = this._streamQueue.shift();
  if (typeof queued !== 'undefined') {
    queued.resolve(stream);
  }
};


/**
 * TransportError class.
 */
var TransportError = function(frame) {
  var messageLength = frame.readUInt16BE(4);
  this.code = frame.readUInt32BE(0);
  this.message = frame.slice(6, 6 + messageLength).toString();
};

TransportError.prototype.toString = function() {
  return this.message;
};


module.exports = Transport;