pristineio/wsd

View on GitHub
lib/Sender.js

Summary

Maintainability
C
1 day
Test Coverage
'use strict';
//
// wsd
// Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
// MIT Licensed
//

var EventEmitter = require('events').EventEmitter;
var util = require('util');
var ErrorCodes = require('./ErrorCodes');
var bufferUtil = require('./BufferUtil').BufferUtil;
var PerMessageDeflate = require('./PerMessageDeflate');

function Sender(socket, extensions) {
  this.socket = socket;
  this.extensions = extensions || {};
  this.firstFragment = true;
  this.compress = false;
  this.messageHandlers = [];
  this.processing = false;
}

util.inherits(Sender, EventEmitter);

Sender.prototype.close = function(code, data, mask, cb) {
  if(code) {
    if(typeof code !== 'number' || !(code in ErrorCodes)) {
      throw new Error('first argument must be a valid error code number');
    }
  }
  code = code || 1000;
  var dataBuffer = new Buffer(2 + (data ? Buffer.byteLength(data) : 0));
  writeUInt16BE(dataBuffer, code, 0);
  if(dataBuffer.length > 2) {
    dataBuffer.write(data, 2);
  }
  var self = this;
  this.messageHandlers.push(function(callback) {
    self.frameAndSend(0x8, dataBuffer, true, mask);
    callback();
    if(typeof cb === 'function') {
      cb();
    }
  });
  this.flush();
};

['ping', 'pong'].forEach(function(method) {
  Sender.prototype[method] = function(data, options) {
    var frameByte = method === 'ping' ? 0x9 : 0xa;
    var mask = options && options.mask;
    var self = this;
    this.messageHandlers.push(function(cb) {
      self.frameAndSend(frameByte, data || '', true, mask);
      cb();
    });
    this.flush();
  };
});

Sender.prototype.send = function(data, options, cb) {
  var finalFragment = options && options.fin === false ? false : true;
  var mask = options && options.mask;
  var compress = options && options.compress;
  var opcode = options && options.binary ? 2 : 1;
  if(this.firstFragment) {
    this.firstFragment = false;
    this.compress = compress;
  } else {
    opcode = 0;
    compress = false;
  }
  if(finalFragment) {
    this.firstFragment = true;
  }

  var compressFragment = this.compress;

  var self = this;
  this.messageHandlers.push(function(callback) {
    self.applyExtensions(data, finalFragment, compressFragment,
      function(err, data) {
        if(err) {
          if(typeof cb === 'function') {
            cb(err);
          } else {
            this.emit('error', err);
          }
          return;
        }
        self.frameAndSend(opcode, data, finalFragment, mask, compress, cb);
        callback();
      }
    );
  });
  this.flush();
};

Sender.prototype.frameAndSend = function(opcode, data, f, m, c, cb) {
  var finalFragment = f;
  var maskData = m;
  var compressed = c;
  var canModifyData = false;

  if(!data) {
    try {
      this.socket.write(
        new Buffer([
            opcode | (finalFragment ? 0x80 : 0),
            0 | (maskData ? 0x80 : 0)
          ].concat(maskData ? [0, 0, 0, 0] : [])
        ),
        'binary',
        cb
      );
    } catch (e) {
      if(typeof cb === 'function') {
        cb(e);
      } else {
        this.emit('error', e);
      }
    }
    return;
  }

  if(!Buffer.isBuffer(data)) {
    canModifyData = true;
    if(data && (data.byteLength || data.buffer)) {
      data = getArrayBuffer(data);
    } else {
      data = new Buffer(data);
    }
  }

  var dataLength = data.length;
  var dataOffset = maskData ? 6 : 2;
  var secondByte = dataLength;

  if(dataLength >= 65536) {
    dataOffset += 8;
    secondByte = 127;
  } else if(dataLength > 125) {
    dataOffset += 2;
    secondByte = 126;
  }

  var mergeBuffers = dataLength < 32768 || (maskData && !canModifyData);
  var totalLength = mergeBuffers ? dataLength + dataOffset : dataOffset;
  var outputBuffer = new Buffer(totalLength);
  outputBuffer[0] = finalFragment ? opcode | 0x80 : opcode;
  if(compressed) {
    outputBuffer[0] |= 0x40;
  }

  switch(secondByte) {
    case 126:
      writeUInt16BE(outputBuffer, dataLength, 2);
      break;
    case 127:
      writeUInt32BE(outputBuffer, 0, 2);
      writeUInt32BE(outputBuffer, dataLength, 6);
  }

  try {
    if(maskData) {
      outputBuffer[1] = secondByte | 0x80;
      var mask = this._randomMask || (this._randomMask = getRandomMask());
      outputBuffer[dataOffset - 4] = mask[0];
      outputBuffer[dataOffset - 3] = mask[1];
      outputBuffer[dataOffset - 2] = mask[2];
      outputBuffer[dataOffset - 1] = mask[3];
      if(mergeBuffers) {
        bufferUtil.mask(data, mask, outputBuffer, dataOffset, dataLength);
        return this.socket.write(outputBuffer, 'binary', cb);
      }
      bufferUtil.mask(data, mask, data, 0, dataLength);
      this.socket.write(outputBuffer, 'binary');
      return this.socket.write(data, 'binary', cb);
    }
    outputBuffer[1] = secondByte;
    if(mergeBuffers) {
      data.copy(outputBuffer, dataOffset);
      return this.socket.write(outputBuffer, 'binary', cb);
    }
    this.socket.write(outputBuffer, 'binary');
    this.socket.write(data, 'binary', cb);
  } catch(ex) {
    if(typeof cb === 'function') {
      cb(ex);
    } else {
      this.emit('error', ex);
    }
  }
};


Sender.prototype.flush = function() {
  if(this.processing) {
    return;
  }
  var handler = this.messageHandlers.shift();
  if(!handler) {
    return;
  }
  this.processing = true;
  var self = this;
  handler(function() {
    self.processing = false;
    self.flush();
  });
};

Sender.prototype.applyExtensions = function(data, fin, compress, cb) {
  if(compress && data) {
    if((data.buffer || data) instanceof ArrayBuffer) {
      data = getArrayBuffer(data);
    }
    return this.extensions[PerMessageDeflate.extensionName]
      .compress(data, fin, cb);
  }
  cb(null, data);
};

function writeUInt16BE(data, value, offset) {
  data[offset] = (value & 0xff00)>>8;
  data[offset+1] = value & 0xff;
}

function writeUInt32BE(data, value, offset) {
  data[offset] = (value & 0xff000000)>>24;
  data[offset+1] = (value & 0xff0000)>>16;
  data[offset+2] = (value & 0xff00)>>8;
  data[offset+3] = value & 0xff;
}

function getArrayBuffer(data) {
  var array = new Uint8Array(data.buffer || data);
  var l = data.byteLength || data.length;
  var o = data.byteOffset || 0;
  var buffer = new Buffer(l);
  for(var i=0; i<l; ++i) {
    buffer[i] = array[o+i];
  }
  return buffer;
}

function getRandomMask() {
  return new Buffer([
    ~~(Math.random() * 255),
    ~~(Math.random() * 255),
    ~~(Math.random() * 255),
    ~~(Math.random() * 255)
  ]);
}

module.exports = Sender;