pristineio/wsd

View on GitHub
lib/WebSocket.js

Summary

Maintainability
F
6 days
Test Coverage
'use strict';
//
// wsd
// Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
// MIT Licensed
//

var url = require('url');
var util = require('util');
var http = require('http');
var https = require('https');
var crypto = require('crypto');
var stream = require('stream');
var Sender = require('./Sender');
var Receiver = require('./Receiver');
var Extensions = require('./Extensions');
var PerMessageDeflate = require('./PerMessageDeflate');
var EventEmitter = require('events').EventEmitter;

var protocolVersion = 13;
var closeTimeout = 30 * 1000;

function WebSocket(address, protocols, options) {
  if('string' === typeof protocols) {
    protocols = [protocols];
  } else if(!Array.isArray(protocols)) {
    if(typeof protocols === 'object') {
      options = protocols;
      protocols = null;
    } else {
      protocols = [];
    }
  }

  this.socket = null;
  this.closeReceived = false;
  this.bytesReceived = 0;
  this.readyState = null;
  this.supports = {};
  this.extensions = {};

  if(Array.isArray(address)) {
    var args = address.concat(options);
    args.splice(0, 0, this);
    initAsServerClient.apply(this, args);
  } else {
    initAsClient(this, address, protocols, options);
  }
}

util.inherits(WebSocket, EventEmitter);

['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'].forEach(function(state, i) {
  WebSocket.prototype[state] = WebSocket[state] = i;
});

WebSocket.prototype.close = function(code, data) {
  switch(this.readyState) {
    case WebSocket.CLOSED:
      return;
    case WebSocket.CONNECTING:
      this.readyState = WebSocket.CLOSED;
      return;
    case WebSocket.CLOSING:
      if(this.closeReceived && this.isServer) {
        this.terminate();
      }
      return;
  }
  var self = this;
  try {
    this.readyState = WebSocket.CLOSING;
    this.closeCode = code;
    this.closeMessage = data;
    var mask = !this.isServer;
    this.sender.close(code, data, mask, function(err) {
      if(err) {
        self.emit('error', err);
      }
      if(self.closeReceived && self.isServer) {
        self.terminate();
      } else {
        clearTimeout(self.closeTimer);
        self.closeTimer = setTimeout(
          cleanupWebsocketResources.bind(self, self, true),
          closeTimeout
        );
      }
    });
  } catch(ex) {
    this.emit('error', ex);
  }
};

WebSocket.prototype.terminate = function() {
  if(this.readyState === WebSocket.CLOSED) {
    return;
  }
  if(!this.socket && this.readyState === WebSocket.CONNECTING) {
    return cleanupWebsocketResources(this, true);
  }
  this.readyState = WebSocket.CLOSING;
  try {
    this.socket.end();
  } catch(e) {
    return cleanupWebsocketResources(this, true);
  }
  if(this.closeTimer) {
    clearTimeout(this.closeTimer);
  }
  this.closeTimer = setTimeout(
    cleanupWebsocketResources.bind(this, this, true),
    closeTimeout
  );
};

WebSocket.prototype.stream = function stream(options, cb) {
  if(typeof options === 'function') {
    cb = options;
    options = {};
  }

  var self = this;

  if(this.readyState !== WebSocket.OPEN) {
    return cb(new Error('not opened'));
  }

  if(this.queue) {
    return this.queue.push(function () {
      self.stream(options, cb);
    });
  }

  options = options || {};

  if(!options.mask) {
    options.mask = !this.isServer;
  }

  if(!options.compress) {
    options.compress = true;
  }

  if(!this.extensions[PerMessageDeflate.extensionName]) {
    options.compress = false;
  }

  startQueue(this);

  function send(data, final) {
    try {
      if(self.readyState !== WebSocket.OPEN) {
        throw new Error('not opened');
      }
      options.fin = final === true;
      self.sender.send(data, options);
      var ok = !final ? process.nextTick(cb.bind(null, null, send)) :
        executeQueueSends(self);
    } catch(e) {
      cb(e);
    }
  }

  process.nextTick(cb.bind(null, null, send));
};

['pause', 'resume'].forEach(function(method) {
  WebSocket.prototype[method] = function() {
    if(this.readyState !== WebSocket.OPEN) {
      throw new Error('not opened');
    }
    return this.socket[method]();
  };
});

['ping', 'pong'].forEach(function(method) {
  WebSocket.prototype[method] = function(data, options, dontFailWhenClosed) {
    dontFailWhenClosed = true;
    if(this.readyState !== WebSocket.OPEN) {
      if(dontFailWhenClosed) {
        return;
      }
      throw new Error('not opened');
    }
    options = options || {};
    if(!options.mask) {
      options.mask = !this.isServer;
    }
    this.sender[method](data, options);
  };
});


WebSocket.prototype.send = function send(data, options, cb) {
  var self = this;
  if(typeof options === 'function') {
    cb = options;
    options = {};
  }

  if(this.readyState !== WebSocket.OPEN) {
    if(typeof cb === 'function') {
      cb(new Error('not opened'));
    }
    return;
  }

  if(!data) {
    data = '';
  }

  if(this.queue) {
    return this.queue.push(function() {
      self.send(data, options, cb);
    });
  }

  options = options || {};
  options.fin = true;

  if(!options.binary) {
    options.binary = (
      data instanceof ArrayBuffer ||
      data instanceof Buffer ||
      data instanceof Uint8Array ||
      data instanceof Uint16Array ||
      data instanceof Uint32Array ||
      data instanceof Int8Array ||
      data instanceof Int16Array ||
      data instanceof Int32Array ||
      data instanceof Float32Array ||
      data instanceof Float64Array
    );
  }

  if(!options.mask) {
    options.mask = !this.isServer;
  }

  if(!options.compress) {
    options.compress = true;
  }

  if(!this.extensions[PerMessageDeflate.extensionName]) {
    options.compress = false;
  }

  var readable = typeof stream.Readable === 'function' ?
    stream.Readable :
    stream.Stream;

  if(data instanceof readable) {
    startQueue(this);
    sendStream(this, data, options, function(error) {
      process.nextTick(function() {
        executeQueueSends(self);
      });
      if(typeof cb === 'function') {
        cb(error);
      }
    });
  } else {
    this.sender.send(data, options, cb);
  }
};

Object.defineProperty(WebSocket.prototype, 'bufferedAmount', {
  get: function get() {
    return !this.socket ? 0 : this.socket.bufferSize;
  }
});

function buildHostHeader(isSecure, hostname, port) {
  var headerHost = hostname;
  if(hostname) {
    if((isSecure && port !== 443) || (!isSecure && port !== 80)) {
      headerHost = headerHost + ':' + port;
    }
  }
  return headerHost;
}

function initAsServerClient(self, req, socket, upgradeHead, opts) {
  var options = {
    protocolVersion: protocolVersion,
    protocol: null,
    extensions: {}
  };
  if(opts) {
    Object.keys(opts).forEach(function(k) {
      options[k] = opts[k];
    });
  }

  self.protocol = options.protocol;
  self.protocolVersion = options.protocolVersion;
  self.extensions = options.extensions;
  self.supports.binary = (self.protocolVersion !== 'hixie-76');
  self.upgradeReq = req;
  self.readyState = WebSocket.CONNECTING;
  self.isServer = true;

  establishConnection(self, Receiver, Sender, socket, upgradeHead);
}


function initAsClient(self, address, protocols, opts) {
  var options = {
    protocolVersion: protocolVersion,
    perMessageDeflate: true
  };

  if(opts) {
    Object.keys(opts).forEach(function(k) {
      options[k] = opts[k];
    });
  }

  if(options.protocolVersion !== 8 && options.protocolVersion !== 13) {
    throw new Error('unsupported protocol version');
  }

  var serverUrl = url.parse(address);

  var isUnixSocket = serverUrl.protocol === 'ws+unix:';
  if(!serverUrl.host && !isUnixSocket) {
    throw new Error('invalid url');
  }
  var isSecure = serverUrl.protocol === 'wss:' ||
    serverUrl.protocol === 'https:';
  var httpObj = isSecure ? https : http;
  var port = serverUrl.port || (isSecure ? 443 : 80);
  var auth = serverUrl.auth;

  // prepare extensions
  var extensionsOffer = {};
  var perMessageDeflate;
  if(options.perMessageDeflate) {
    perMessageDeflate = new PerMessageDeflate(
      typeof options.perMessageDeflate === 'object' ?
        options.perMessageDeflate :
        {},
      false
    );
    extensionsOffer[PerMessageDeflate.extensionName] =
      perMessageDeflate.offer();
  }

  // expose state properties
  self.isServer = false;
  self.url = address;
  self.protocolVersion = options.protocolVersion;
  self.supports.binary = self.protocolVersion !== 'hixie-76';

  // begin handshake
  var key = !options.clientKey ?
    new Buffer(options.protocolVersion + '-' + Date.now())
      .toString('base64') :
    options.clientKey;

  var expectedServerKey = crypto
    .createHash('sha1')
    .update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')
    .digest('base64');

  var requestOptions = {
    port: port,
    host: serverUrl.hostname,
    headers: {
      'Connection': 'Upgrade',
      'Upgrade': 'websocket',
      'Host': buildHostHeader(isSecure, serverUrl.hostname, port),
      'Sec-WebSocket-Version': options.protocolVersion,
      'Sec-WebSocket-Key': key
    }
  };

  if(auth) {
    requestOptions.headers.Authorization = 'Basic ' +
      new Buffer(auth).toString('base64');
  }

  if(options.protocol) {
    requestOptions.headers['Sec-WebSocket-Protocol'] = options.protocol;
  } else if(protocols) {
    requestOptions.headers['Sec-WebSocket-Protocol'] = protocols.join(',');
    options.protocol = protocols.join(',');
  }

  if(options.host) {
    requestOptions.headers.Host = options.host;
  }

  if(options.headers) {
    for(var header in options.headers) {
       if(options.headers.hasOwnProperty(header)) {
        requestOptions.headers[header] = options.headers[header];
       }
    }
  }

  if(Object.keys(extensionsOffer).length) {
    requestOptions.headers['Sec-WebSocket-Extensions'] =
      Extensions.format(extensionsOffer);
  }

  ['pfx', 'key', 'passphrase', 'cert', 'ca', 'ciphers', 'rejectUnauthorized']
    .forEach(function(k) {
      if(k in options && options[k]) {
        requestOptions[k] = options[k];
      }
    }
  );

  requestOptions.path = serverUrl.path || '/';

  if(isUnixSocket) {
    requestOptions.socketPath = serverUrl.pathname;
  }

  if(options.origin) {
    if(options.protocolVersion < 13) {
      requestOptions.headers['Sec-WebSocket-Origin'] = options.origin;
    } else {
      requestOptions.headers.Origin = options.origin;
    }
  }

  var req = httpObj.request(requestOptions);

  req.on('error', function onerror(error) {
    self.emit('error', error);
    cleanupWebsocketResources(self, error);
  });

  req.once('response', function response(res) {
    var error;
    if(!self.emit('unexpected-response', req, res)) {
      error = new Error('unexpected server response (' + res.statusCode + ')');
      req.abort();
      self.emit('error', error);
    }
    cleanupWebsocketResources(self, error);
  });

  req.once('upgrade', function upgrade(res, socket, upgradeHead) {
    if(self.readyState === WebSocket.CLOSED) {
      self.emit('close');
      self.removeAllListeners();
      return socket.end();
    }

    var serverKey = res.headers['sec-websocket-accept'];
    if(!serverKey || serverKey !== expectedServerKey) {
      self.emit('error', 'invalid server key');
      self.removeAllListeners();
      return socket.end();
    }

    var serverProt = res.headers['sec-websocket-protocol'];
    var protList = (options.protocol || '').split(/, */);
    var protError = null;

    if(!options.protocol && serverProt) {
      protError = 'server sent a subprotocol even though none requested';
    } else if(options.protocol && !serverProt) {
      protError = 'server sent no subprotocol even though requested';
    } else if(serverProt && protList.indexOf(serverProt) === -1) {
      protError = 'server responded with an invalid protocol';
    }

    if(protError) {
      self.emit('error', protError);
      self.removeAllListeners();
      socket.end();
      return;
    } else if(serverProt) {
      self.protocol = serverProt;
    }

    var serverExtensions = Extensions.parse(
      res.headers['sec-websocket-extensions']
    );

    if(perMessageDeflate && serverExtensions[PerMessageDeflate.extensionName]) {
      try {
        perMessageDeflate.accept(
          serverExtensions[PerMessageDeflate.extensionName]
        );
      } catch(err) {
        self.emit('error', 'invalid extension parameter');
        self.removeAllListeners();
        return socket.end();
      }
      self.extensions[PerMessageDeflate.extensionName] = perMessageDeflate;
    }
    establishConnection(self, Receiver, Sender, socket, upgradeHead);
    req.removeAllListeners();
    req = null;
  });
  req.end();
  self.readyState = WebSocket.CONNECTING;
}


function establishConnection(self, ReceiverClass, SenderClass, socket, u) {
  var upgradeHead = u;
  var called = false;

  socket.setTimeout(0);
  socket.setNoDelay(true);

  self.receiver = new ReceiverClass(self.extensions);
  self.socket = socket;

  // socket cleanup handlers
  socket.on('end', cleanupWebsocketResources.bind(self, self));
  socket.on('close', cleanupWebsocketResources.bind(self, self));
  socket.on('error', cleanupWebsocketResources.bind(self, self));

  // ensure that the upgradeHead is added to the receiver
  function firstHandler(data) {
    if(called || self.readyState === WebSocket.CLOSED) {
      return;
    }
    called = true;

    socket.removeListener('data', firstHandler);
    socket.on('data', realHandler);

    if(upgradeHead && upgradeHead.length > 0) {
      realHandler(upgradeHead);
      upgradeHead = null;
    }

    if(data) {
      realHandler(data);
    }
  }

  // subsequent packets are pushed straight to the receiver
  function realHandler(data) {
    self.bytesReceived += data.length;
    self.receiver.add(data);
  }

  socket.on('data', firstHandler);

  // if data was passed along with the http upgrade,
  // this will schedule a push of that on to the receiver.
  // this has to be done on next tick, since the caller
  // hasn't had a chance to set event handlers on this client
  // object yet.
  process.nextTick(firstHandler);

  // receiver event handlers
  self.receiver.ontext = function ontext(data, flags) {
    flags = flags || {};
    self.emit('message', data, flags);
  };

  self.receiver.onbinary = function onbinary(data, flags) {
    flags = flags || {};
    flags.binary = true;
    self.emit('message', data, flags);
  };

  self.receiver.onping = function onping(data, flags) {
    flags = flags || {};
    self.pong(data, {
      mask: !self.isServer,
      binary: flags.binary === true
    }, true);
    self.emit('ping', data, flags);
  };

  self.receiver.onpong = function onpong(data, flags) {
    self.emit('pong', data, flags || {});
  };

  self.receiver.onclose = function onclose(code, data, flags) {
    flags = flags || {};
    self.closeReceived = true;
    self.close(code, data);
  };

  self.receiver.onerror = function onerror(reason, errorCode) {
    self.close(!errorCode ? 1002 : errorCode, '');
    self.emit('error', reason, errorCode);
  };

  // finalize the client
  self.sender = new SenderClass(socket, self.extensions);
  self.sender.on('error', function onerror(error) {
    self.close(1002, '');
    self.emit('error', error);
  });

  self.readyState = WebSocket.OPEN;
  self.emit('open');
}

function startQueue(instance) {
  instance.queue = instance.queue || [];
}

function executeQueueSends(instance) {
  var queue = instance.queue;
  if(!queue) {
    return;
  }
  delete instance.queue;
  for (var i = 0, l = queue.length; i < l; ++i) {
    queue[i]();
  }
}

function sendStream(instance, stream, options, cb) {
  stream.on('data', function incoming(data) {
    if(instance.readyState !== WebSocket.OPEN) {
      if(typeof cb === 'function') {
        cb(new Error('not opened'));
      } else {
        delete instance.queue;
        instance.emit('error', new Error('not opened'));
      }
      return;
    }
    options.fin = false;
    instance.sender.send(data, options);
  });

  stream.on('end', function end() {
    if(instance.readyState !== WebSocket.OPEN) {
      if(typeof cb === 'function') {
        cb(new Error('not opened'));
      } else {
        delete instance.queue;
        instance.emit('error', new Error('not opened'));
      }
      return;
    }

    options.fin = true;
    instance.sender.send(null, options);

    if(typeof cb === 'function') {
      cb(null);
    }
  });
}

function cleanupWebsocketResources(self, error) {
  if(self.readyState === WebSocket.CLOSED) {
    return;
  }

  var emitClose = self.readyState !== WebSocket.CONNECTING;
  self.readyState = WebSocket.CLOSED;
  clearTimeout(self.closeTimer);
  self.closeTimer = null;

  if(emitClose) {
    self.emit('close', !error ? self.closeCode || 1000 : 1006,
      self.closeMessage || '');
  }

  if(self.socket) {
    self.socket.on('error', function() {
      try { self.destroy(); } catch(e) {}
    });

    try {
      if(!error) {
        self.socket.end();
      } else {
        self.socket.destroy();
      }
    } catch(e) {
      // Ignore termination errors
    }
    self.socket = null;
  }

  if(self.sender) {
    self.sender.removeAllListeners();
    self.sender = null;
  }

  if(self.receiver) {
    self.receiver.cleanup();
    self.receiver = null;
  }

  self.removeAllListeners();
  self.on('error', function(){});
  delete self.queue;
}

module.exports = WebSocket;