alexbeletsky/ddp-server-event

View on GitHub
source/index.js

Summary

Maintainability
B
5 hrs
Test Coverage
var util = require('util');
var events = require('events');

var WebSocket = require('faye-websocket');
var EJSON = require('ejson');

function Request (req, sock, body) {
    var ws = new WebSocket(req, sock, body);
    var session = new Date().getTime().toString();

    var send = function (data) {
        ws.send(EJSON.stringify(data));
    };

    var methods = {
        sendResult: function (id, result) {
            send({msg: 'result', id: id, result: result});
            send({msg: 'updated', id: id});
        },

        sendError: function (id, error) {
            send({msg: 'error', id: id, error: error});
        },

        sendAdded: function (id, collection, fields) {
            send({msg: 'added', id: id, collection: collection, fields: fields});
        },

        sendChanged: function (id, collection, fields, cleared) {
            send({msg: 'changed', id: id, collection: collection, fields: fields, cleared: cleared});
        },

        sendRemoved: function (id, collection, fields, cleared) {
            send({msg: 'removed', id: id, collection: collection, fields: fields, cleared: cleared});
        },

        sendReady: function (id) {
            send({msg: 'ready', subs: [id]});
        },

        sendNosub: function (id, error) {
            send({msg: 'nosub', id: id, error: error});
        },

        sendEvent: function (msg, data) {
            data.msg = msg;
            send(data);
        }
    };

    var request = {
        id: session,

        handle: function (ddp) {
            ws.on('open', handleOpen.bind(ddp));
            ws.on('message', handleMessage.bind(ddp));
            ws.on('close', handleClose.bind(ddp));
        },

        close: function () {
            if (ws) {
                ws.close();
                ws = session = null;
            }
        }
    };

    function handleOpen() {
        this.emit('connected', request);
    }

    function handleMessage(event) {
        var data = JSON.parse(event.data);
        var message = data.msg;

        // binding EventEmitter .emit() method
        // http://stackoverflow.com/a/8016478/386751
        methods._events = this._events;

        // hanle special cases as `connected` and `pong`
        if (message === 'connect') {
            methods.sendEvent('connected', {session: session});
        } else if (message === 'ping') {
            methods.sendEvent('pong', {id: data.id});
        // handle rpc calls
        } else if (message === 'method') {
            var prefixed = 'method:' + data.method;
            this.emit.call(methods, prefixed, data.id, data.params);
        // sub and unsub
        } else if (message === 'sub' || message === 'unsub') {
            this.emit.call(methods, message, data.id, data.name, data.params);
        }
    }

    function handleClose () {
        this.emit('disconnected', request);
        ws = session = null;
    }

    return request;
}

function Ddp(options) {
    if (!options || !options.server) {
        throw new Error('missing server instance');
    }

    var ddp = this;
    var server = options.server;
    var requests = [];

    events.EventEmitter.call(this);

    ddp.on('connected', function (request) {
        requests.push(request);
    });

    ddp.on('disconnected', function (request) {
        requests.splice(requests.indexOf(request), 1);
    });

    server.on('upgrade', function (req, sock, body) {
        if (WebSocket.isWebSocket(req)) {
            var request = new Request(req, sock, body);
            request.handle(ddp);
        }
    });

    ddp.listen = function () {
        ddp.emit('ready', ddp);
        server.listen.apply(server, arguments);
    };

    ddp.close = function () {
        // closing all active requests..
        requests.forEach(function (r) {
            r.close();
        });

        requests = [];

        // close http server
        server.close.apply(server, arguments);
    };

}

util.inherits(Ddp, events.EventEmitter);

module.exports = Ddp;