noodlefrenzy/node-amqp10

View on GitHub
lib/connection.js

Summary

Maintainability
F
3 days
Test Coverage
'use strict';
var EventEmitter = require('events').EventEmitter,
    fs = require('fs'),
    util = require('util'),

    debug = require('debug')('amqp10:connection'),
    debugTRACE = require('debug')('amqp10:trace'),
    BufferList = require('bl'),
    StateMachine = require('stately.js'),

    constants = require('./constants'),
    errors = require('./errors'),
    frames = require('./frames'),
    u = require('./utilities'),

    ErrorCondition = require('./types/error_condition'),
    TransportProvider = require('./transport');



/**
 * Connection states, from AMQP 1.0 spec:
 *
 <dl>
 <dt>START</dt>
 <dd><p>In this state a Connection exists, but nothing has been sent or received. This is the
 state an implementation would be in immediately after performing a socket connect or
 socket accept.</p></dd>

 <dt>HDR-RCVD</dt>
 <dd><p>In this state the Connection header has been received from our peer, but we have not
 yet sent anything.</p></dd>

 <dt>HDR-SENT</dt>
 <dd><p>In this state the Connection header has been sent to our peer, but we have not yet
 received anything.</p></dd>

 <dt>OPEN-PIPE</dt>
 <dd><p>In this state we have sent both the Connection header and the open frame, but we have not yet received anything.
 </p></dd>

 <dt>OC-PIPE</dt>
 <dd><p>In this state we have sent the Connection header, the open
 frame, any pipelined Connection traffic, and the close frame,
 but we have not yet received anything.</p></dd>

 <dt>OPEN-RCVD</dt>
 <dd><p>In this state we have sent and received the Connection header, and received an
 open frame from our peer, but have not yet sent an
 open frame.</p></dd>

 <dt>OPEN-SENT</dt>
 <dd><p>In this state we have sent and received the Connection header, and sent an
 open frame to our peer, but have not yet received an
 open frame to our peer, but have not yet received an
 open frame.</p></dd>

 <dt>CLOSE-PIPE</dt>
 <dd><p>In this state we have send and received the Connection header, sent an
 open frame, any pipelined Connection traffic, and the
 close frame, but we have not yet received an
 open frame.</p></dd>

 <dt>OPENED</dt>
 <dd><p>In this state the Connection header and the open frame
 have both been sent and received.</p></dd>

 <dt>CLOSE-RCVD</dt>
 <dd><p>In this state we have received a close frame indicating
 that our partner has initiated a close. This means we will never have to read anything
 more from this Connection, however we can continue to write frames onto the Connection.
 If desired, an implementation could do a TCP half-close at this point to shutdown the
 read side of the Connection.</p></dd>

 <dt>CLOSE-SENT</dt>
 <dd><p>In this state we have sent a close frame to our partner.
 It is illegal to write anything more onto the Connection, however there may still be
 incoming frames. If desired, an implementation could do a TCP half-close at this point
 to shutdown the write side of the Connection.</p></dd>

 <dt>DISCARDING</dt>
 <dd><p>The DISCARDING state is a variant of the CLOSE_SENT state where the
 close is triggered by an error. In this case any incoming frames on
 the connection MUST be silently discarded until the peer's close frame
 is received.</p></dd>

 <dt>END</dt>
 <dd><p>In this state it is illegal for either endpoint to write anything more onto the
 Connection. The Connection may be safely closed and discarded.</p></dd>
 </dl>
 *
 * Connection negotiation state diagram from AMQP 1.0 spec:
 *
 <pre>
              R:HDR +=======+ S:HDR             R:HDR[!=S:HDR]
           +--------| START |-----+    +--------------------------------+
           |        +=======+     |    |                                |
          \\|/                    \\|/   |                                |
      +==========+             +==========+ S:OPEN                      |
 +----| HDR-RCVD |             | HDR-SENT |------+                      |
 |    +==========+             +==========+      |      R:HDR[!=S:HDR]  |
 |   S:HDR |                      | R:HDR        |    +-----------------+
 |         +--------+      +------+              |    |                 |
 |                 \\|/    \\|/                   \\|/   |                 |
 |                +==========+               +-----------+ S:CLOSE      |
 |                | HDR-EXCH |               | OPEN-PIPE |----+         |
 |                +==========+               +-----------+    |         |
 |           R:OPEN |      | S:OPEN              | R:HDR      |         |
 |         +--------+      +------+      +-------+            |         |
 |        \\|/                    \\|/    \\|/                  \\|/        |
 |   +===========+             +===========+ S:CLOSE       +---------+  |
 |   | OPEN-RCVD |             | OPEN-SENT |-----+         | OC-PIPE |--+
 |   +===========+             +===========+     |         +---------+  |
 |  S:OPEN |                      | R:OPEN      \\|/           | R:HDR   |
 |         |       +========+     |          +------------+   |         |
 |         +----- >| OPENED |< ---+          | CLOSE-PIPE |< -+         |
 |                 +========+                +------------+             |
 |           R:CLOSE |    | S:CLOSE              | R:OPEN               |
 |         +---------+    +-------+              |                      |
 |        \\|/                    \\|/             |                      |
 |   +============+          +=============+     |                      |
 |   | CLOSE-RCVD |          | CLOSE-SENT* |< ---+                      |
 |   +============+          +=============+                            |
 | S:CLOSE |                      | R:CLOSE                             |
 |         |         +=====+      |                                     |
 |         +------- >| END |< ----+                                     |
 |                   +=====+                                            |
 |                     /|\                                              |
 |    S:HDR[!=R:HDR]    |                R:HDR[!=S:HDR]                 |
 +----------------------+-----------------------------------------------+

 </pre>
 *
 * R:<b>CTRL</b> = Received <b>CTRL</b>
 *
 * S:<b>CTRL</b> = Sent <b>CTRL</b>
 *
 * Also could be DISCARDING if an error condition triggered the CLOSE
 *
 * @param connectPolicy ConnectPolicy from a Policy instance
 * @constructor
 */
function Connection(connectPolicy) {
  Connection.super_.call(this);

  var options = connectPolicy.options;
  u.assertArguments(options, ['containerId', 'hostname']);
  this.policy = connectPolicy;
  this._transport = null;
  this.connected = false;
  this.connectedTo = null;
  this._buffer = new BufferList();
  this._heartbeatInterval = undefined;

  this.local = { open: new frames.OpenFrame(options) };
  this.remote = {};

  this._sslOptions = null;
  if (options.sslOptions) {
    this._sslOptions = options.sslOptions;

    if (options.sslOptions.keyFile) {
      this._sslOptions.key = fs.readFileSync(options.sslOptions.keyFile);
      delete this._sslOptions.keyFile;
    }
    if (options.sslOptions.certFile) {
      this._sslOptions.cert = fs.readFileSync(options.sslOptions.certFile);
      delete this._sslOptions.certFile;
    }
    if (options.sslOptions.caFile) {
      this._sslOptions.ca = fs.readFileSync(options.sslOptions.caFile);
      delete this._sslOptions.caFile;
    }
  }

  this._lastOutgoing = null; // To track whether heartbeat frame required.
  this._receivedHeader = false;
  this._sessions = {};
  var self = this;
  var stateMachine = {
    'DISCONNECTED': {
      connect: function(address, sasl) {
        self._connect(address, sasl);
        return this.START;
      }
    },
    'START': {
      connected: function(sasl) {
        if (sasl) {
          self.sasl = sasl;
          self.sasl.negotiate(self, self.address, function(e) { self._saslComplete(e); });
          return this.IN_SASL;
        } else {
          self.sendHeader();
          return this.HDR_SENT;
        }
      },
      headerReceived: function() { return this.HDR_RCVD; }
    },
    'IN_SASL': {
      success: function() {
        self.sendHeader();
        return this.HDR_SENT;
      }
    },
    'HDR_RCVD': {
      validVersion: function() {
        self.sendHeader();
        return this.HDR_EXCH;
      },
      invalidVersion: function() {
        self._terminate();
        return this.DISCONNECTING;
      }
    },
    'HDR_SENT': {
      validVersion: function() {
        return this.HDR_EXCH;
      },
      invalidVersion: function() {
        self._terminate();
        return this.DISCONNECTING;
      }
    },
    'HDR_EXCH': {
      sendOpen: function() {
        self._sendOpenFrame();
        return this.OPEN_SENT;
      },
      openReceived: function() {
        return this.OPEN_RCVD;
      }
    },
    'OPEN_RCVD': {
      sendOpen: function() {
        self._sendOpenFrame();
        return this.OPENED;
      }
    },
    'OPEN_SENT': {
      openReceived: function() {
        return this.OPENED;
      },
      sendClose: function() {
        self._terminate();
        return this.DISCONNECTED;
      }
    },
    'OPENED': {
      closeReceived: function() {
        return this.CLOSE_RCVD;
      },
      invalidOptions: function() {
        self._sendCloseFrame(ErrorCondition.ConnectionForced, 'Invalid options');
        return this.CLOSE_SENT;
      },
      sendClose: function() {
        self._sendCloseFrame();
        return this.CLOSE_SENT;
      }
    },
    'CLOSE_RCVD': {
      sendClose: function() {
        self._sendCloseFrame();
        self._terminate();
        return this.DISCONNECTED;
      }
    },
    'CLOSE_SENT': {
      closeReceived: function() {
        self._terminate();
        return this.DISCONNECTED;
      },
      disconnect: function() { return this.DISCONNECTED; }
    },
    'DISCONNECTING': {
      disconnected: function() {
        return this.DISCONNECTED;
      }
    }
  };

  var errorHandler = function(err) { self._processError(err); return this.DISCONNECTED; };
  var terminationHandler = function() { self._terminate(); return this.DISCONNECTED; };
  Object.keys(stateMachine).forEach(function(state) {
    if (state !== 'DISCONNECTED') {
      stateMachine[state].error = errorHandler;
      stateMachine[state].terminated = terminationHandler;
    }
  });

  this.connSM = new StateMachine(stateMachine).bind(function(event, oldState, newState) {
    debug('stateChange:', oldState, '=>', newState, ', reason:', event);
  });
}

util.inherits(Connection, EventEmitter);

// Events
Connection.Connected = 'connection:connected';
Connection.Disconnected = 'connection:disconnected';

// On receipt of a frame not handled internally (e.g. not a BEGIN/CLOSE/SASL).
// Provides received frame as an argument.
Connection.FrameReceived = 'connection:frameReceived';

// Since 'error' events are "special" in Node (as in halt-the-process special),
// using a custom event for errors we receive from the other endpoint. Provides
// received AMQPError as an argument.
Connection.ErrorReceived = 'connection:errorReceived';


/**
 * Open a connection to the given (parsed) address (@see {@link AMQPClient}).
 *
 * @param address   Contains at least protocol, host and port, may contain user/pass, path.
 * @param sasl      If given, contains a "negotiate" method that, given address and a callback, will run through SASL negotiations.
 */
Connection.prototype.open = function(address, sasl) {
  this.connSM.connect(address, sasl);
};

Connection.prototype.close = function() {
  this.connSM.sendClose();
};

Connection.prototype.sendFrame = function(frame, callback) {
  this._lastOutgoing = Date.now();
  frames.writeFrame(frame, this._transport, callback);
};

Connection.prototype.associateSession = function(session) {
  var channel = this._nextChannel();
  this._sessions[channel] = session;
  return channel;
};

Connection.prototype.dissociateSession = function(channel) {
  this._sessions[channel] = undefined;
};

Connection.prototype._nextChannel = function() {
  for (var cid = 1; cid <= this.remote.open.channelMax; ++cid) {
    if (this._sessions[cid] === undefined) return cid;
  }
  throw new errors.OverCapacityError('Out of available ' + this.remote.open.channelMax + ' channels');
};

Connection.prototype._processError = function(err) {
  debug('Error from socket: ' + err);
  this.emit(Connection.ErrorReceived, errors.wrapProtocolError(err));
  this._terminate(err);
};

Connection.prototype._receiveData = function(buffer) {
  debugTRACE('Rx: ' + buffer.toString('hex'));
  this._buffer.append(buffer);
  this._receiveAny();
};

Connection.prototype.sendHeader = function(header) {
  header = header || constants.amqpVersion;
  debug('Sending Header ' + header.toString('hex'));
  this._transport.write(header);
};

Connection.prototype._tryReceiveHeader = function(header) {
  header = header || constants.amqpVersion;
  if (this._buffer.length >= header.length) {
    var serverVersion = this._buffer.slice(0, 8);
    this._buffer.consume(8);

    if (this.sasl) {
      this.sasl.headerReceived(serverVersion);
    } else {
      debug('Server AMQP Version: ' + serverVersion.toString('hex') + ' vs ' + header.toString('hex'));
      if (this.connSM.getMachineState() === 'START') {
        this.connSM.headerReceived();
      }

      if (u.bufferEquals(serverVersion, constants.amqpVersion)) {
        this.connSM.validVersion();
        this.connSM.sendOpen();
      } else {
        var msg = 'Invalid AMQP version received: ';
        if (u.bufferEquals(serverVersion, constants.saslVersion)) {
          msg = 'Credentials Expected - SASL version received: ';
        }
        this.emit(Connection.ErrorReceived, new errors.VersionError(msg + serverVersion.toString('hex')));
        this.connSM.invalidVersion();
      }
      this._receivedHeader = true;
    }

    return true;
  }
  return false;
};

Connection.prototype._receiveAny = function() {
  var frame = null;
  while (true) {
    if (this.sasl && !this.sasl.receivedHeader) {
      if (!this._tryReceiveHeader(constants.saslVersion)) break;
    } else if (!this.sasl && !this._receivedHeader) {
      if (!this._tryReceiveHeader()) break;
    }

    try {
      frame = frames.readFrame(this._buffer);
    } catch (e) {
      var self = this;
      this._sendCloseFrame(ErrorCondition.ConnectionFramingError, e.message, function(err) {  // jshint ignore:line
        self._terminate(true);
      });

      break;
    }

    if (!frame) break;
    if (frame instanceof frames.OpenFrame) {
      this._processOpenFrame(frame);
    } else if (frame instanceof frames.CloseFrame) {
      this._processCloseFrame(frame);
      break;
    } else {
      this.emit(Connection.FrameReceived, frame);
    }
  }
};

Connection.prototype._ensureLocaleCompatibility = function(lhsLocales, rhsLocales) {
  return true;
};

Connection.prototype._sendOpenFrame = function() {
  this.sendFrame(this.local.open);
};

Connection.prototype._processOpenFrame = function(frame) {
  this.connSM.openReceived();
  if (this.connSM.getMachineState() === 'OPEN_RCVD') {
    this.connSM.sendOpen();
  }

  var valid = true;
  this.remote.open = frame;
  if (this.remote.open.outgoingLocales) {
    valid = valid &&
      this._ensureLocaleCompatibility(this.local.open.incomingLocales, this.remote.open.outgoingLocales);
  }
  if (this.remote.open.incomingLocales) {
    valid = valid &&
      this._ensureLocaleCompatibility(this.local.open.outgoingLocales, this.remote.open.incomingLocales);
  }

  if (!valid) {
    return this.connSM.invalidOptions();
  }

  // We're connected.
  debug('Connected with params { maxFrameSize=', this.remote.open.maxFrameSize,
        ', channelMax=', this.remote.open.channelMax, ', idleTimeout=',
        this.local.open.idleTimeout, ', remoteIdleTimeout=', this.remote.open.idleTimeout,
        ' }');
  this.connected = true;
  this._lastOutgoing = Date.now();
  if (this.remote.open.idleTimeout && Number(this.remote.open.idleTimeout) > 0) {
    var maxTimeBeforeHeartbeat = Math.floor(this.remote.open.idleTimeout / 2);
    var timeBetweenHeartbeatChecks = Math.floor(this.remote.open.idleTimeout / 8);

    debug('Setting heartbeat check timeout to ' + timeBetweenHeartbeatChecks);
    var self = this;
    this._heartbeatInterval = setInterval(function() {
      if (self.connected && self._transport && (Date.now() - self._lastOutgoing) > maxTimeBeforeHeartbeat) {
        self.sendFrame(new frames.HeartbeatFrame());
      }
    }, timeBetweenHeartbeatChecks);
  }

  this.emit(Connection.Connected, this);
  if (this._buffer.length) this._receiveAny(); // Might have more frames pending.
};

Connection.prototype._sendCloseFrame = function(condition, description, callback) {
  var error = null;
  if (!!condition) {
    error = { condition: condition, description: description };
  }

  this.sendFrame(new frames.CloseFrame({ error: error }), callback);
};

Connection.prototype._processCloseFrame = function(frame) {
  if (frame.error) {
    this.emit(Connection.ErrorReceived, errors.wrapProtocolError(frame.error));
  }

  this.connSM.closeReceived();
  this.connSM.sendClose();
};

Connection.prototype._connect = function(address, sasl) {
  this.address = address;
  var self = this;
  self.dataHandler = self._receiveHeader;
  self._transport = TransportProvider.getTransportFor(address.protocol);

  if (!self._transport) {
    throw new Error('Invalid protocol, no associated transport. Please use client.register(protocol, transport) to register a new transport.');
  }

  self._transport.connect(address, self._sslOptions);
  self._transport
    .on('connect', function() { self.connSM.connected(sasl); })
    .on('data', function(buf) { self._receiveData(buf); })
    .on('error', function(err) { self.connSM.error(err); })
    .on('end', function() {
      debug('on(end)');
      self.connSM.terminated();
    });
};

Connection.prototype._saslComplete = function(err) {
  this.sasl = null;
  if (err) {
    this.connSM.error(err);
  } else {
    this.connSM.success();
  }
};

Connection.prototype._terminate = function(hasError) {
  var transport = this._transport;
  this._transport = null;
  this._sslOptions = null;
  this.sasl = null;
  this.address = null;
  if (transport) {
    transport.end();
    if (hasError) {
      transport.destroy();
    }
  }

  if (this._heartbeatInterval) {
    clearInterval(this._heartbeatInterval);
    this._heartbeatInterval = undefined;
  }

  this.connected = false;
  this.emit(Connection.Disconnected, this);
};

module.exports = Connection;