lib/PeerConnection.js
/*jshint node: true */
'use strict';
var events = require('events');
var util = require('util');
var Messages = require('./Messages');
var RSVP = require('rsvp');
var PeerConnection = module.exports = function(socket){
events.EventEmitter.call(this);
this.setMaxListeners(0);
Object.defineProperties(this,{
infoHash: {
writable: true,
enumerable: true
},
peerId: {
writable: true,
enumerable: true
},
_socket: {
writable: true,
},
_msgQueue: {
value: [],
writable: true
},
_writeQueue: {
value: []
},
handshakeReceived: {
value: false,
writable: true,
enumerable: true
},
_deathTimer: {
writable: true
},
lastMessageReceivedOn: {
writable: true
}
});
this.bindSocket(socket);
};
util.inherits(PeerConnection, events.EventEmitter);
PeerConnection.prototype.close = function() {
this.clearDeathTimer();
this._socket.end();
};
PeerConnection.prototype.bindSocket = function(socket) {
this._socket = socket;
var self = this;
socket.on('end', function() {
self._socket.end();
});
socket.on('data', function(data){
self.routeData(data);
});
socket.on('close', function(had_error) {
self.clearDeathTimer();
self.emit('close', had_error);
// check for mock socket
if (typeof(self._socket.unref) === "function"){
self._socket.unref();
}
});
socket.on('error', function(err){
self.emit('error', err);
});
};
PeerConnection.prototype.clearDeathTimer = function() {
if (this._deathTimer) {
clearTimeout(this._deathTimer);
}
};
PeerConnection.prototype.resetDeathTimer = function() {
this.lastMessageReceivedOn = new Date();
this.clearDeathTimer();
this._deathTimer = setTimeout(PeerConnection.prototype.close.bind(this), 120000);
};
PeerConnection.prototype.routeData = function(data) {
this.resetDeathTimer();
if (!this.handshakeReceived) {
var handshake = new Messages.Handshake(data);
this.infoHash = handshake.infoHash;
this.peerId = handshake.peerId;
this.handshakeReceived = true;
this.emit('handshake', handshake);
return;
}
// rebuild the message from two or more packets
data = Buffer.concat(this._msgQueue.concat([data]));
// reset the msg queue
this._msgQueue = [];
// validate the lengths of the data
if (!this.isValidData(data)) {
// if not valid, push into queue
this._msgQueue.push(data);
// exit
return;
}
// start at position 0
var cursor = 0;
// array of msgs
var msgs = [];
do {
// read the length again
var msgLen = data.readInt32BE(cursor);
// slice the data
var part = data.slice(cursor, cursor + msgLen + 4);
// fire the event for the part
msgs.push(this.fireEvent(part));
// advance the cursor
cursor += msgLen + 4;
} while (cursor < data.length - 4);
return msgs;
};
PeerConnection.prototype.isValidData = function(data) {
var cursor = 0;
var lenTotal = 0;
var count = 0;
do {
var len = data.readInt32BE(cursor);
lenTotal += len;
count++;
cursor += len + 4;
} while(cursor < data.length - 4);
return ((count * 4) + lenTotal) === data.length;
};
PeerConnection.prototype.fireEvent = function(part) {
var msg = new Messages.KeepAlive(part);
if (msg.len === 0) {
this.emit('keep-alive', msg);
return msg;
}
msg = new Messages.Peer(part);
var func = PeerConnection.prototype.fireEvent[msg.id];
if (func) {
return func.call(this, part);
}
this.emit('unknown', msg);
return msg;
};
PeerConnection.prototype._constructAndFireEvent = function(Constructor, eventName, part) {
var msg = new Constructor(part);
this.emit(eventName, msg);
return msg;
};
PeerConnection.prototype.fireEvent[Messages.Choke.Id] = function(part) {
return this._constructAndFireEvent(Messages.Choke, 'choke', part);
};
PeerConnection.prototype.fireEvent[Messages.Unchoke.Id] = function(part) {
return this._constructAndFireEvent(Messages.Unchoke, 'unchoke', part);
};
PeerConnection.prototype.fireEvent[Messages.Interested.Id] = function(part) {
return this._constructAndFireEvent(Messages.Interested, 'interested', part);
};
PeerConnection.prototype.fireEvent[Messages.NotInterested.Id] = function(part) {
return this._constructAndFireEvent(Messages.NotInterested, 'not-interested', part);
};
PeerConnection.prototype.fireEvent[Messages.Have.Id] = function(part) {
return this._constructAndFireEvent(Messages.Have, 'have', part);
};
PeerConnection.prototype.fireEvent[Messages.Bitfield.Id] = function(part) {
return this._constructAndFireEvent(Messages.Bitfield, 'bitfield', part);
};
PeerConnection.prototype.fireEvent[Messages.Request.Id] = function(part) {
return this._constructAndFireEvent(Messages.Request, 'request', part);
};
PeerConnection.prototype.fireEvent[Messages.Piece.Id] = function(part) {
return this._constructAndFireEvent(Messages.Piece, 'piece', part);
};
PeerConnection.prototype.fireEvent[Messages.Cancel.Id] = function(part) {
return this._constructAndFireEvent(Messages.Cancel, 'cancel', part);
};
PeerConnection.prototype.fireEvent[Messages.Port.Id] = function(part) {
return this._constructAndFireEvent(Messages.Port, 'port', part);
};
PeerConnection.prototype.write = function(msg) {
if (!msg || !msg.toBuffer || typeof(msg.toBuffer) !== 'function') {
throw new TypeError("msg must have toBuffer()");
}
var defer = RSVP.defer();
// throttle writes
this._writeQueue.push({ defered: defer, msg: msg });
process.nextTick(PeerConnection.prototype._write.bind(this));
return defer.promise;
};
PeerConnection.prototype._write = function() {
if (this._writeQueue.length === 0) { return; }
var obj = this._writeQueue.shift();
var self = this;
this._socket.write(obj.msg.toBuffer(), null, function() {
obj.defered.resolve();
process.nextTick(PeerConnection.prototype._write.bind(self));
});
};
PeerConnection.prototype.handshake = function(peerId, infoHash) {
return this.write((new Messages.Handshake()).init({
peerId: peerId,
infoHash: infoHash
}));
};
PeerConnection.prototype.keepAlive = function() {
return this.write(new Messages.KeepAlive());
};
PeerConnection.prototype.choke = function() {
return this.write(new Messages.Choke());
};
PeerConnection.prototype.unchoke = function() {
return this.write(new Messages.Unchoke());
};
PeerConnection.prototype.interested = function() {
return this.write(new Messages.Interested());
};
PeerConnection.prototype.notInterested = function() {
return this.write(new Messages.NotInterested());
};
PeerConnection.prototype.have = function(pieceIndex) {
return this.write((new Messages.Have()).init({
pieceIndex : pieceIndex
}));
};
PeerConnection.prototype.bitfield = function(bitfield) {
return this.write((new Messages.Bitfield()).init({
bitfield : bitfield
}));
};
PeerConnection.prototype._request = function(Constructor, index, begin, length) {
return this.write((new Constructor()).init({
index : index,
begin : begin,
length : length
}));
};
PeerConnection.prototype.request = function(index, begin, length){
return this._request(Messages.Request, index, begin, length);
};
PeerConnection.prototype.cancel = function(index, begin, length){
return this._request(Messages.Cancel, index, begin, length);
};
PeerConnection.prototype.piece = function(index, begin, block){
return this.write((new Messages.Piece()).init({
index : index,
begin : begin,
block : block
}));
};
PeerConnection.prototype.port = function(listenPort) {
return this.write((new Messages.Port()).init({
listenPort : listenPort
}));
};