RackHD/on-wss

View on GitHub
lib/ws-service.js

Summary

Maintainability
D
2 days
Test Coverage
// Copyright 2016, EMC, Inc.

'use strict';

var express = require('express');
var cors = require('cors');
var fs = require('fs');
var http = require('http');
var https = require('https');

module.exports = webSocketServiceFactory;

webSocketServiceFactory.$provide = 'Services.WebSocket';
webSocketServiceFactory.$inject = [
  'Logger',
  'Services.Configuration',
  'uuid',
  'WebSocketError',
  'WebSocketResources',
  'WebSocketServer'
];

function webSocketServiceFactory(
    Logger,
    serviceConfiguration,
    uuid,
    WebSocketError,
    webSocketResources,
    WebSocketServer
) {
    var logger = Logger.initialize(webSocketServiceFactory);

    class WebSocketService {
        constructor() {
            this.handlers = this._defaultMessageHandlers(),
            this.resources = webSocketResources;
            this.sessions = {};
            var app = express();
            app.use(cors());
            app.options('*', cors());
            app.get('/', function(req, res) {
                res.send('RackHD WebSocketServer');
            });
            this.app = app;
            this.ssl = serviceConfiguration.get('wssSsl', false);
            if (this.ssl) {
                var pfxPath = serviceConfiguration.get('wssSslPfx');
                if (pfxPath) {
                    this.sslOptions = {pfx: fs.readFileSync(pfxPath)};
                } else {
                    var certPath = serviceConfiguration.get('wssSslCert', 'data/dev-cert.pem'),
                        keyPath = serviceConfiguration.get('wssSslKey', 'data/dev-key.pem')
                    this.sslOptions = {
                        cert: fs.readFileSync(certPath),
                        key: fs.readFileSync(keyPath)
                    };
                }
            }
            this.serverOptions = {
                address: serviceConfiguration.get('wssBindAddress', '0.0.0.0'),
                port: serviceConfiguration.get('wssBindPort', 9100)
            }
            console.log(this.serverOptions, this.sslOptions);
            this.server = this.ssl ?
                https.createServer(this.sslOptions, app) :
                http.createServer(app);
            this.wssOptions = {
                server: this.server
            };
        }

        error(wsError) { logger.error(wsError.message, wsError); }

        broadcast(data, skipFunc) {
            if (!this.webSocketServer) {
                this.error(new WebSocketError('Broadcast failed, no server is available.'));
            }
            const wsClientBroadcast = wsConn => {
                if (typeof skipFunc === 'function' && skipFunc(wsConn)) { return; }
                try { wsConn.send(data); } catch (err) {
                    logger.warning('Failed to send message to client: %s'.format(wsConn.id));
                    this._closeConnection(wsConn);
                }
            }
            this.webSocketServer.clients.forEach(wsClientBroadcast);
        }

        start(callback) {
            try {
                logger.info('Creating WebSocketServer, and initializing WebSocketService...');
                this.webSocketServer = new WebSocketServer(this.wssOptions);
                this.webSocketServer.on('connection', this._handleConnection.bind(this));
            }
            catch (err) {
                this.error(new WebSocketError(err.message, {originalError: err}));
            }
            return new Promise(function(resolve, reject) {
                return this.server.listen(
                    this.serverOptions.port,
                    this.serverOptions.address,
                    function(error) {
                        if (error) {
                            this.error(error);
                            callback(error);
                            return reject(error);
                        }
                        logger.info('WebSocketServer is listening on ' +
                            this.serverOptions.address + ':' +
                            this.serverOptions.port + '...');
                        callback();
                        resolve();
                    }.bind(this)
                );
            }.bind(this));
        }

        stop(callback) {
            logger.info('Stopping WebSocketService...');
            if (this.webSocketServer) { this.webSocketServer.close(callback); }
            delete this.webSocketServer;
        }

        _closeConnection(wsConn) {
            if (!this.sessions[wsConn.id]) { return; }
            logger.debug('Client: %s disconnected from WebSocketServer.'.format(wsConn.id));
            delete this.sessions[wsConn.id];
            if (wsConn.watchers) {
                Object.keys(wsConn.watchers).forEach(watcherHash => {
                    var watcherList = wsConn.watchers[watcherHash];
                    watcherList && watcherList.forEach(watcher => watcher && watcher.dispose());
                });
                delete wsConn.watchers;
            }
            try { wsConn.terminate(); } catch (err) {}
        }

        _defaultMessageHandlers() {
            return {
                error:  msg => this.error(msg.message, {errorObject: msg}),
                init:  (msg, wsConn) => wsConn.sendSession(),
                all:   (msg, wsConn, rawMsg) => this._forwardResourceMethod('all', msg, wsConn, rawMsg),
                get:   (msg, wsConn, rawMsg) => this._forwardResourceMethod('get', msg, wsConn, rawMsg),
                query: (msg, wsConn, rawMsg) => this._forwardResourceMethod('query', msg, wsConn, rawMsg),
                stop:  (msg, wsConn, rawMsg) => this._forwardResourceMethod('stop', msg, wsConn, rawMsg),
                watch: (msg, wsConn, rawMsg) => this._forwardResourceMethod('watch', msg, wsConn, rawMsg)
            };
        }

        _forwardResourceMethod(method, msg, wsConn, rawMsg) {
            logger.debug('Client: %s requested a resource call.'.format(wsConn.id),
                {rawMessage: rawMsg});
            var resource = msg.resource || wsConn.upgradeReq.url.split('/').pop();
            if (this.resources[resource] && typeof this.resources[resource][method] === 'function') {
                return this.resources[resource][method](msg, wsConn, rawMsg);
            }
            this.error(new WebSocketError('Invalid WebSocketResource: %s'.format(msg.resource),
                {rawMessage: rawMsg}));
        }

        _handleConnection(wsConn) {
            wsConn.id = uuid('v4');
            this.sessions[wsConn.id] = wsConn;
            logger.debug('Client: %s connected to WebSocketServer.'.format(wsConn.id));
            wsConn.addWatcher = (params, watcher) => {
                var hash = JSON.stringify(params);
                wsConn.watchers = wsConn.watchers || {};
                wsConn.watchers[hash] = wsConn.watchers[hash] || [];
                wsConn.watchers[hash].push(watcher);
                return watcher;
            };
            wsConn.removeWatchers = params => {
                var hash = JSON.stringify(params);
                if (wsConn.watchers && wsConn.watchers[hash]) {
                    wsConn.watchers[hash].forEach(watcher => {
                        watcher && watcher.dispose();
                    });
                    delete wsConn.watchers[hash];
                    return true;
                }
            };
            wsConn.sendError = (err, resource) => wsConn.sendObject({
              handler: 'error',
              resource: resource,
              params: err
            });
            wsConn.sendItem = (resource, id, data) => wsConn.sendObject({
              handler: 'item',
              resource: resource,
              id: id,
              data: data
            });
            wsConn.sendList = (resource, items) => wsConn.sendObject({
              handler: 'list',
              resource: resource,
              items: items
            });
            wsConn.sendObject = object => wsConn.send(JSON.stringify(object));
            wsConn.sendRemove = (resource, id, data) => wsConn.sendObject({
              handler: 'remove',
              resource: resource,
              id: id,
              data: data
            });
            wsConn.sendSession = () => wsConn.sendObject({
              handler: 'session',
              id: wsConn.id
            });
            wsConn.on('message', this._handleMessage.bind(this, wsConn));
            wsConn.on('error', this._closeConnection.bind(this, wsConn));
            wsConn.on('close', this._closeConnection.bind(this, wsConn));
        };

        _handleMessage(wsConn, wsMsg) {
            logger.debug('Client: %s messsage was received.'.format(wsConn.id),
                {rawMessage: wsMsg});
            var msg;
            try { msg = JSON.parse(wsMsg); } catch (err) {
                return this.error(new WebSocketError('Malformed message from: %s'.format(wsConn.id),
                    {rawMessage: wsMsg}));
            }
            if (typeof this.handlers[msg.handler] === 'function') {
                return this.handlers[msg.handler](msg, wsConn, wsMsg);
            }
            this.error(new WebSocketError('Invalid message from: %s'.format(wsConn.id),
                {rawMessage: wsMsg, parsedMessage: msg}));
        };
    }

    return new WebSocketService();
}