lib/WebSocketServer.js
'use strict';
//
// wsd
// Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
// MIT Licensed
//
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var http = require('http');
var crypto = require('crypto');
var WebSocket = require('./WebSocket');
var Extensions = require('./Extensions');
var PerMessageDeflate = require('./PerMessageDeflate');
var url = require('url');
function WebSocketServer(options, callback) {
options.perMessageDeflate = true;
if(!('port' in options) && !('server' in options) && !options.noServer) {
throw new TypeError('`port` or a `server` must be provided');
}
var self = this;
if(options.port) {
this.server = http.createServer(function (req, res) {
var body = http.STATUS_CODES[426];
res.writeHead(426, {
'Content-Length': body.length,
'Content-Type': 'text/plain'
});
res.end(body);
});
this.server.allowHalfOpen = false;
this.server.listen(options.port, options.host, callback);
this.closeServer = function() {
if(self.server) {
self.server.close();
}
};
} else if(options.server) {
this.server = options.server;
if(options.path) {
if(this.server.webSocketPaths &&
options.server.webSocketPaths[options.path]) {
throw new Error('two instances of WebSocketServer cannot listen on ' +
'the same http server path');
}
if(typeof this.server.webSocketPaths !== 'object') {
this.server.webSocketPaths = {};
}
this.server.webSocketPaths[options.path] = 1;
}
}
if(this.server) {
this.server.once('listening', function() {
self.emit('listening');
});
this.server.on('error', function(error) {
self.emit('error', error);
});
this.server.on('upgrade', function(req, socket, upgradeHead) {
var head = new Buffer(upgradeHead.length);
upgradeHead.copy(head);
req.query = url.parse(req.url, true).query;
self.handleUpgrade(req, socket, head, function(client) {
self.emit('connection' + req.url, client);
self.emit('connection', client);
});
});
}
this.options = options;
this.path = options.path;
this.clients = {};
}
util.inherits(WebSocketServer, EventEmitter);
WebSocketServer.prototype.close = function() {
try {
var self = this;
Object.keys(self.clients).forEach(function(x) {
if(x in self.clients) {
self.clients[x].terminate();
}
});
if(this.path && this.server.webSocketPaths) {
delete this.server.webSocketPaths[this.path];
if(Object.keys(this.server.webSocketPaths).length === 0) {
delete this.server.webSocketPaths;
}
}
if(this.closeServer) {
this.closeServer();
}
} catch (e) {
throw e;
} finally {
delete this.server;
}
};
WebSocketServer.prototype.handleUpgrade = function(req, socket, u, cb) {
var upgradeHead = u;
if(this.options.path) {
u = url.parse(req.url);
if(u && u.pathname !== this.options.path) {
return;
}
}
if(!req.headers.upgrade ||
req.headers.upgrade.toLowerCase() !== 'websocket') {
return abortConnection(socket, 400);
}
var errorHandler = function() {
try {
socket.destroy();
} catch (e) {}
};
socket.on('error', errorHandler);
// verify key presence
if(!req.headers['sec-websocket-key']) {
return abortConnection(socket, 400);
}
// verify version
var version = parseInt(req.headers['sec-websocket-version']);
if(version !== 13 && version !== 8) {
return abortConnection(socket, 400);
}
// verify protocol
var protocols = req.headers['sec-websocket-protocol'];
// verify client
var origin = version < 13 ?
req.headers['sec-websocket-origin'] :
req.headers.origin;
// handle extensions offer
var extensionsOffer = Extensions.parse(
req.headers['sec-websocket-extensions']
);
// handler to call when the connection sequence completes
var self = this;
var completeHybiUpgrade2 = function(protocol) {
var originalKey = req.headers['sec-websocket-key'];
var clientKey = originalKey;
var key = crypto.createHash('sha1')
.update(originalKey + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')
.digest('base64');
var headers = [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
'Sec-WebSocket-Accept: ' + key
];
if(protocol) {
headers.push('Sec-WebSocket-Protocol: ' + protocol);
}
var extensions = {};
try {
extensions = acceptExtensions(self.options, extensionsOffer);
} catch (err) {
return abortConnection(socket, 400);
}
if(Object.keys(extensions).length) {
var serverExtensions = {};
Object.keys(extensions).forEach(function(token) {
serverExtensions[token] = [extensions[token].params];
});
headers.push('Sec-WebSocket-Extensions: ' +
Extensions.format(serverExtensions));
}
var parsedUrl = url.parse(req.url, true);
if(parsedUrl.query.token) {
headers.push('authorization: bearer ' + parsedUrl.query.token);
} else if(req.headers.authorization) {
headers.push('authorization: ' + req.headers.authorization);
}
self.emit('headers', headers);
socket.setTimeout(0);
socket.setNoDelay(true);
try {
socket.write(headers.concat('', '').join('\r\n'));
} catch (e) {
try { socket.destroy(); } catch(e) {}
return;
}
headers.forEach(function(x) {
var kv = x.split(': ');
if(/x-wsd-client-key/i.test(kv[0])) {
clientKey = kv[1];
}
});
var client = new WebSocket([req, socket, upgradeHead], {
protocolVersion: version,
protocol: protocol,
extensions: extensions
});
var finish = function() {
self.clients[clientKey] = client;
client.on('close', function() {
delete self.clients[clientKey];
});
socket.removeListener('error', errorHandler);
cb(client);
};
if(clientKey in self.clients) {
self.clients[clientKey].removeAllListeners('close');
self.clients[clientKey].on('close', function() {
self.emit('close', 1000, '');
finish();
});
self.clients[clientKey].close();
} else {
finish();
}
};
var completeHybiUpgrade1 = function() {
if(typeof self.options.handleProtocols == 'function') {
var protList = (protocols || "").split(/, */);
var callbackCalled = false;
var res = self.options.handleProtocols(protList,
function(result, protocol) {
callbackCalled = true;
if(!result) {
abortConnection(socket, 401);
} else {
completeHybiUpgrade2(protocol);
}
}
);
if(!callbackCalled) {
abortConnection(socket, 501);
}
return;
} else {
if(!protocols) {
completeHybiUpgrade2();
} else {
completeHybiUpgrade2(protocols.split(/, */)[0]);
}
}
};
if(typeof this.options.verifyClient == 'function') {
var info = {
origin: origin,
secure: !!req.connection.authorized || !!req.connection.encrypted,
req: req
};
if(this.options.verifyClient.length == 2) {
return this.options.verifyClient(info, function(result, code) {
if(!code) {
code = 401;
}
if(!result) {
abortConnection(socket, code);
} else {
completeHybiUpgrade1();
}
});
} else if(!this.options.verifyClient(info)) {
return abortConnection(socket, 401);
}
}
completeHybiUpgrade1();
};
function acceptExtensions(options, offer) {
var extensions = {};
options = options.perMessageDeflate;
if(options && offer[PerMessageDeflate.extensionName]) {
var perMessageDeflate = new PerMessageDeflate(
!options ? options : {},
true
);
perMessageDeflate.accept(offer[PerMessageDeflate.extensionName]);
extensions[PerMessageDeflate.extensionName] = perMessageDeflate;
}
return extensions;
}
function abortConnection(socket, code) {
try {
socket.write([
'HTTP/1.1 ' + code + ' ' + http.STATUS_CODES[code],
'Content-type: text/html'
].concat('', '').join('\r\n'));
} catch (e) {
// ignore errors - we've aborted this connection
} finally {
try { socket.destroy(); } catch (e) {}
}
}
module.exports = WebSocketServer;