pristineio/wsd

View on GitHub
lib/Receiver.js

Summary

Maintainability
C
1 day
Test Coverage
'use strict';
//
// wsd
// Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
// MIT Licensed
//

var util = require('util');
var Handler = require('./Handler');
var BufferPool = require('./BufferPool');
var bufferUtil = require('./BufferUtil').BufferUtil;
var PerMessageDeflate = require('./PerMessageDeflate');

function Receiver(extensions) {
  var self = this;

  ['fragmentedBufferPool', 'unfragmentedBufferPool'].forEach(function(x) {
    var prevUsed = -1;
    self[x] = new BufferPool(
      1024,
      function(db, length) {
        return db.used + length;
      },
      function(db) {
        return prevUsed >= 0 ? (prevUsed + db.used)/2 : db.used;
      }
    );
  });

  this.extensions = extensions || {};
  this.state = {
    activeFragmentedOperation: null,
    lastFragment: false,
    masked: false,
    opcode: 0,
    fragmentedOperation: false
  };
  this.overflow = [];
  this.headerBuffer = new Buffer(10);
  this.expectOffset = 0;
  this.expectBuffer = null;
  this.expectHandler = null;
  this.currentMessage = [];
  this.messageHandlers = [];
  this.expect('header', 2, this.processPacket);
  this.dead = false;
  this.processing = false;

  this.onerror = function() {};
  this.ontext = function() {};
  this.onbinary = function() {};
  this.onclose = function() {};
  this.onping = function() {};
  this.onpong = function() {};
}


Receiver.prototype.add = function(data) {
  var dataLength = data.length;
  if(!dataLength) {
    return;
  }
  if(!this.expectBuffer) {
    return this.overflow.push(data);
  }
  var toRead = Math.min(
    dataLength,
    this.expectBuffer.length - this.expectOffset
  );

  data.copy(this.expectBuffer, this.expectOffset, 0, toRead);

  this.expectOffset += toRead;
  if(toRead < dataLength) {
    this.overflow.push(data.slice(toRead));
  }
  while(this.expectBuffer && this.expectOffset === this.expectBuffer.length) {
    var bufferForHandler = this.expectBuffer;
    this.expectBuffer = null;
    this.expectOffset = 0;
    this.expectHandler(bufferForHandler);
  }
};

Receiver.prototype.cleanup = function() {
  this.dead = true;
  this.overflow = null;
  this.headerBuffer = null;
  this.expectBuffer = null;
  this.expectHandler = null;
  this.unfragmentedBufferPool = null;
  this.fragmentedBufferPool = null;
  this.state = null;
  this.currentMessage = null;
  this.onerror = null;
  this.ontext = null;
  this.onbinary = null;
  this.onclose = null;
  this.onping = null;
  this.onpong = null;
};

Receiver.prototype.expect = function(type, length, handler) {
  if(length === 0) {
    return handler(null);
  }
  this.expectBuffer = type === 'header' ?
    this.headerBuffer.slice(this.expectOffset, this.expectOffset + length) :
    this.allocateFromPool(length, this.state.fragmentedOperation);
  this.expectHandler = handler;
  var toRead = length;
  while (toRead > 0 && this.overflow.length > 0) {
    var fromOverflow = this.overflow.pop();
    if(toRead < fromOverflow.length) {
      this.overflow.push(fromOverflow.slice(toRead));
    }
    var read = Math.min(fromOverflow.length, toRead);
    fromOverflow.copy(this.expectBuffer, this.expectOffset, 0, read);
    this.expectOffset += read;
    toRead -= read;
  }
};

Receiver.prototype.allocateFromPool = function(length, isFragmented) {
  return (isFragmented ?
    this.fragmentedBufferPool :
    this.unfragmentedBufferPool
  ).get(length);
};

Receiver.prototype.processPacket = function(data) {
  var datum = data[0];
  if(this.extensions[PerMessageDeflate.extensionName]) {
    if((datum & 0x30) !== 0) {
      return this.error('reserved fields (2, 3) must be empty', 1002);
    }
  } else if((datum & 0x70) !== 0) {
    return this.error('reserved fields must be empty', 1002);
  }

  this.state.lastFragment = (datum & 0x80) === 0x80;
  this.state.masked = (data[1] & 0x80) === 0x80;
  var compressed = (datum & 0x40) === 0x40;
  var opcode = datum & 0xf;

  if(opcode === 0) {
    if(compressed) {
      return this.error(
        'continuation frame cannot have the Per-message Compressed bits', 1002
      );
    }
    this.state.fragmentedOperation = true;
    this.state.opcode = this.state.activeFragmentedOperation;
    if(!(this.state.opcode === 1 || this.state.opcode === 2)) {
      return this.error(
        'continuation frame cannot follow current opcode', 1002
      );
    }
  } else {
    if(opcode < 3 && !!this.state.activeFragmentedOperation) {
      return this.error(
        'data frames after the initial data frame must have opcode 0', 1002
      );
    }
    if(opcode >= 8 && compressed) {
      return this.error(
        'control frames cannot have the Per-message Compressed bits', 1002
      );
    }
    this.state.compressed = compressed;
    this.state.opcode = opcode;
    if(this.state.lastFragment === false) {
      this.state.fragmentedOperation = true;
      this.state.activeFragmentedOperation = opcode;
    } else {
      this.state.fragmentedOperation = false;
    }
  }
  this.myHandler = new Handler(this);
  if(!(this.state.opcode in this.myHandler.opcodes)) {
    return this.error('no handler for opcode ' + this.state.opcode, 1002);
  }
  this.myHandler.start(data);
};

Receiver.prototype.endPacket = function() {
  if(!this.state.fragmentedOperation) {
    this.unfragmentedBufferPool.reset(true);
  } else if(this.state.lastFragment) {
    this.fragmentedBufferPool.reset(false);
  }
  this.expectOffset = 0;
  this.expectBuffer = null;
  this.expectHandler = null;
  if(this.state.lastFragment &&
    this.state.opcode === this.state.activeFragmentedOperation) {
    this.state.activeFragmentedOperation = null;
  }
  this.state.lastFragment = false;
  this.state.opcode = !this.state.activeFragmentedOperation ?
    0 :
    this.state.activeFragmentedOperation;
  this.state.masked = false;
  this.expect('header', 2, this.processPacket);
};

Receiver.prototype.reset = function() {
  if(this.dead) {
    return;
  }
  this.state = {
    activeFragmentedOperation: null,
    lastFragment: false,
    masked: false,
    opcode: 0,
    fragmentedOperation: false
  };
  this.fragmentedBufferPool.reset(true);
  this.unfragmentedBufferPool.reset(true);
  this.expectOffset = 0;
  this.expectBuffer = null;
  this.expectHandler = null;
  this.overflow = [];
  this.currentMessage = [];
  this.messageHandlers = [];
};

Receiver.prototype.unmask = function(mask, buf, binary) {
  if(mask && buf) {
    bufferUtil.unmask(buf, mask);
  }
  if(binary) {
    return buf;
  }
  return !buf ? '' : buf.toString('utf8');
};

Receiver.prototype.concatBuffers = function(buffers) {
  var length = 0;
  for(var i=0, l=buffers.length; i<l; ++i) {
    length += buffers[i].length;
  }
  var mergedBuffer = new Buffer(length);
  bufferUtil.merge(mergedBuffer, buffers);
  return mergedBuffer;
};

Receiver.prototype.error = function(reason, protocolErrorCode) {
  this.reset();
  this.onerror(reason, protocolErrorCode);
  return this;
};

Receiver.prototype.flush = function() {
  if(this.processing || this.dead) {
    return;
  }
  var handler = this.messageHandlers.shift();
  if(!handler) {
    return;
  }
  this.processing = true;
  var self = this;
  handler(function() {
    self.processing = false;
    self.flush();
  });
};

Receiver.prototype.applyExtensions = function(messageBuffer, fin, c, cb) {
  var compressed = c;
  var self = this;
  if(!compressed) {
    return cb(null, messageBuffer);
  }
  this.extensions[PerMessageDeflate.extensionName]
    .decompress(messageBuffer, fin, function(err, buffer) {
      if(self.dead) {
        return;
      }
      if(err) {
        return cb(new Error('invalid compressed data'));
      }
      cb(null, buffer);
    }
  );
};

module.exports = Receiver;