freedomjs/freedom

View on GitHub
providers/transport/webrtc/transport.webrtc.js

Summary

Maintainability
B
5 hrs
Test Coverage
/*jslint sloppy:true, node:true*/
/*globals freedom,console,FileReaderSync,exports,Promise:true*/

// For use in unit testing
if (typeof Promise === 'undefined' && typeof require === 'function') {
  /*jslint -W079 */
  Promise = require('es6-promise').Promise;
  /*jslint +W079 */
}


/*
 * Peer 2 Peer transport provider.
 *
 */
var WebRTCTransportProvider = function (dispatchEvent) {
  this.dispatchEvent = dispatchEvent;
  this.name = null;
  this._setup = false;
  this.pc = freedom['core.peerconnection']();
  this.pc.on('onReceived', this.onData.bind(this));
  this.pc.on('onClose', this.onClose.bind(this));
  this.pc.on('onOpenDataChannel', this.onNewTag.bind(this));
  this._tags = [];
  // Maps tags to booleans. The boolean corresponding to a given tag
  // is true if this WebRTCTransportProvider is currently sending out
  // chunks of a message for that tag.
  this._sending = {};
  // Maps tags to lists of whole outgoing messages that are waiting to
  // be sent over the wire. Messages must be sent one at a time to
  // prevent the interleaving of chunks from two or messages.
  this._queuedMessages = {};
  // Entries in this dictionary map tags to arrays containing chunks
  // of incoming messages. If there is no entry for a tag in the
  // dictionary, then we have not received the first chunk of the next
  // message.
  this._chunks = {};
  // Messages may be limited to a 16KB length
  // http://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-07#section-6.6
  this._chunkSize = 15000;
  // The maximum amount of bytes we should allow to get queued up in
  // peerconnection, any more and we start queueing ourself.
  this._pcQueueLimit = 1024 * 250;
  // Javascript has trouble representing integers larger than 2^53 exactly
  this._maxMessageSize = Math.pow(2, 53);
};

WebRTCTransportProvider.stun_servers = [
  "stun:stun.l.google.com:19302",
  "stun:stun1.l.google.com:19302",
  "stun:stun2.l.google.com:19302",
  "stun:stun3.l.google.com:19302",
  "stun:stun4.l.google.com:19302"
];

// The argument |signallingChannelId| is a freedom communication channel id to
// use to open a peer connection. 
WebRTCTransportProvider.prototype.setup = function(name, signallingChannelId,
                                                   continuation) {
  this.name = name;
  var promise = this.pc.setup(signallingChannelId, name,
                              WebRTCTransportProvider.stun_servers, false);
  this._setup = true;
  promise.then(continuation).catch(function(err) {
    console.error('Error setting up peerconnection');
    
    continuation(undefined, {
      "errcode": "FAILED",
      "message": 'Error setting up peerconnection'
    });
  });
};

WebRTCTransportProvider.prototype.send = function(tag, data, continuation) {
  // console.log("TransportProvider.send." + this.name);
  if(!this._setup) {
    continuation(undefined, {
      "errcode": "NOTREADY",
      "message": "send called before setup"
    });
    throw new Error("send called before setup in WebRTCTransportProvider");
  }
  if (this._tags.indexOf(tag) >= 0) {
    if (this._sending[tag]) {
      // A message is currently being sent. Queue this message to
      // prevent message interleaving.
      this._queuedMessages[tag].push({tag: tag,
                                      data: data,
                                      continuation: continuation});
      return;
    }
    var buffers = this._chunk(data);
    this._sending[tag] = true;
    this._waitSend(tag, buffers).then(function afterSending() {
      this._sending[tag] = false;
      if ((typeof this._queuedMessages[tag] !== "undefined") &&
          this._queuedMessages[tag].length > 0) {
        var next = this._queuedMessages[tag].shift();
        this.send(next.tag, next.data, next.continuation);
      }
    }.bind(this)).then(continuation);
  } else {
    this.pc.openDataChannel(tag).then(function(){
      this._tags.push(tag);
      this._sending[tag] = false;
      this._queuedMessages[tag] = [];

      this.send(tag, data, continuation);
    }.bind(this));
  }
};



/**
 * Chunk a single ArrayBuffer into multiple ArrayBuffers of
 * this._chunkSize length.
 * @param {ArrayBuffer} data - Data to chunk.
 * @return {Array} - Array of ArrayBuffer objects such that the
 * concatenation of all array buffer objects equals the data
 * parameter.
 */
WebRTCTransportProvider.prototype._chunk = function(data) {
  // The first 8 bytes of the first chunk of a message encodes the
  // number of bytes in the message.
  var dataView = new Uint8Array(data);
  var buffers = [];
  var size = data.byteLength;
  var lowerBound = 0; // exclusive range
  var upperBound;

  // lowerBound points to the byte after the last byte to be chunked
  // from the original data buffer.  It should be the case that
  // lowerBound < upperBound.
  // Buffer: [------------------------------------------------]
  //          ^              ^              ^  
  //          lB_0           uB_0/lB_1      uB_1/lB_2 ...    ^uB_n
  
  var sizeBuffer = this._sizeToBuffer(size);
  var firstBuffer = new Uint8Array(Math.min(this._chunkSize,
                                             size + sizeBuffer.byteLength));

  firstBuffer.set(sizeBuffer, 0);
  upperBound = Math.min(this._chunkSize - sizeBuffer.byteLength,
                        firstBuffer.byteLength);
  firstBuffer.set(dataView.subarray(0, upperBound), sizeBuffer.byteLength);
  buffers.push(firstBuffer.buffer);
  lowerBound = upperBound;

  while (lowerBound < size) {
    upperBound = lowerBound + this._chunkSize;
    buffers.push(data.slice(lowerBound, upperBound));
    lowerBound = upperBound;
  }

  return buffers;
};

WebRTCTransportProvider.prototype._waitSend = function(tag, buffers) {
  var bufferBound = 0, // upper bound on the # of bytes buffered
      sendBuffers, checkBufferedAmount;

  sendBuffers = function() {
    var promises = [];
    while(bufferBound + this._chunkSize <= this._pcQueueLimit &&
          buffers.length > 0) {
      var nextBuffer = buffers.shift();
      promises.push(this.pc.send({"channelLabel": tag,
                                  "buffer": nextBuffer}));
      bufferBound += nextBuffer.byteLength;
    }

    var allSends = Promise.all(promises);
    if (buffers.length === 0) {
      return allSends;
    }
    return allSends.then(checkBufferedAmount);
  }.bind(this);

  checkBufferedAmount = function() {
    return this.pc.getBufferedAmount(tag).then(function(bufferedAmount) {
      bufferBound = bufferedAmount;
      if (bufferedAmount + this._chunkSize > this._pcQueueLimit) {
        return new Promise(function(resolve) {
          setTimeout(function() {
            resolve(checkBufferedAmount());
          }, 100);
        });
      } else {
        return sendBuffers();
      }
    });
  }.bind(this);
  // Check first, in case there is data in the pc buffer from another message.
  return checkBufferedAmount();
};


WebRTCTransportProvider.prototype.close = function(continuation) {
  // TODO: Close data channels.
  this._tags = [];
  this._sending = {};
  this._queuedMessages = {};
  this._chunks = {};
  this.pc.close().then(continuation);
};

// Called when the peer-connection receives data, it then passes it here.
WebRTCTransportProvider.prototype.onData = function(msg) {
  //console.log("TransportProvider.prototype.message: Got Message:" + JSON.stringify(msg));
  if (msg.buffer) {
    this._handleData(msg.channelLabel, msg.buffer);
  } else if (msg.binary) {
    if (typeof FileReaderSync === 'undefined') {
      var fileReader = new FileReader();
      fileReader.onload = (function(handleData, channelLabel) {
        return function(e) {
          handleData(channelLabel, e.target.result);
        };
      }(this._handleData.bind(this), msg.channelLabel));
      fileReader.readAsArrayBuffer(msg.binary);
    } else {
      var fileReaderSync = new FileReaderSync();
      var arrayBuffer = fileReaderSync.readAsArrayBuffer(msg.binary);
      this._handleData(msg.channelLabel, arrayBuffer);
    }
  } else if (msg.text) {
    console.error("Strings not supported.");
  }  else {
    console.error('message called without a valid data field');
  }
};

WebRTCTransportProvider.prototype._handleData = function(tag, buffer) {
  var currentTag;
  if (tag in this._chunks) {
    currentTag = this._chunks[tag];
    currentTag.buffers.push(buffer);
    currentTag.currentByteCount += buffer.byteLength;
  } else {
    currentTag = {buffers: [],
                  currentByteCount: 0,
                  totalByteCount: 0};
    this._chunks[tag] = currentTag;
    var size = this._bufferToSize(buffer.slice(0, 8));
    if (size > this._maxMessageSize) {
      console.warn("Incomming message is larger than maximum message size");
    }
    currentTag.totalByteCount = size;
    currentTag.buffers.push(buffer.slice(8));
    currentTag.currentByteCount += buffer.byteLength - 8;
  }

  if(currentTag.currentByteCount === currentTag.totalByteCount) {
    var returnBuffer = this._assembleBuffers(tag);
    this.dispatchEvent('onData', {
      "tag": tag, 
      "data": returnBuffer
    });
    delete this._chunks[tag];
  } else if(currentTag.currentByteCount > currentTag.totalByteCount) {
    console.warn("Received more bytes for message than expected, something has gone seriously wrong");
    delete this._chunks[tag];
  }
  
};

WebRTCTransportProvider.prototype.onNewTag = function(event) {
  this._tags.push(event.channelId);
};

WebRTCTransportProvider.prototype.onClose = function() {
  this._tags = [];
  this._sending = {};
  this._queuedMessages = {};
  this._chunks = {};
  this.dispatchEvent('onClose', null);
};


WebRTCTransportProvider.prototype._sizeToBuffer = function(size) {
  // Bit shifts have overflow issues for any integers with more than
  // 32 bits, so use division.
  var buffer = new ArrayBuffer(8);
  var view = new Uint8Array(buffer);
  for (var index = 0; index < 8; index++) {
    /*jslint bitwise:true*/
    var currentByte = (size & 0xff);
    /*jslint bitwise:false*/
    view [ index ] = currentByte;
    size = (size - currentByte) / 256 ;
  }
  return view;
};

WebRTCTransportProvider.prototype._bufferToSize = function(buffer) {
  var view = new Uint8Array(buffer);
  var number = 0;
  for ( var i = view.byteLength - 1; i >= 0; i--) {
    number = (number * 256) + view[i];
  }

  return number;
};

/*
 * Reassemble the buffers for the given tag into a single ArrayBuffer object.
 * @param {String} 
 * @return {ArrayBuffer} Result of concatenating all buffers for tag
 */
WebRTCTransportProvider.prototype._assembleBuffers = function(tag) {
  var size = this._chunks[tag].totalByteCount;
  var bytesCopied = 0;
  var result = new ArrayBuffer(size);
  var view = new Uint8Array(result);
  this._chunks[tag].buffers.forEach(function(buffer) {

    view.set(new Uint8Array(buffer), bytesCopied);
    bytesCopied += buffer.byteLength;
  });
  return result;
};

/** REGISTER PROVIDER **/
if (typeof freedom !== 'undefined') {
  freedom().provideAsynchronous(WebRTCTransportProvider);
}

if (typeof exports !== 'undefined') {
  exports.provider = WebRTCTransportProvider;
  exports.name = 'transport';
}