GCSBOSS/api-joe

View on GitHub
lib/events.js

Summary

Maintainability
A
0 mins
Test Coverage
const cookieSignature = require('cookie-signature');
const cookie = require('cookie');
const WebSocket = require('ws');
const url = require('url');

/* istanbul ignore next */
function checkClientsHealth(){
    // this => wss
    this.clients.forEach(ws => {
        if(ws.isAlive === false)
            return ws.terminate();
        ws.isAlive = false;
        ws.ping(Function.prototype);
    });
}

async function onConnect(ws, req){
    let addr = req.connection.remoteAddress;
    req.pathname = url.parse(req.url, true).pathname

    ws.isAlive = true;
    ws.on('pong', /* istanbul ignore next */ () => ws.isAlive = true);

    this.log.debug({ type: 'ws', addr }, 'New connection from %s', addr);

    ws.on('close', () => {
        this.log.debug({ type: 'ws', addr }, 'Closed connection from %s', addr);

        if(ws.claim){
            this.global.redis.del('joe:ws:' + ws.claim);
            this.global.redis.publish('joe:ws:close', ws.claim);
            clearInterval(ws.ei);
        }
    });

    /* istanbul ignore next */
    ws.on('error', err => this.log.error({ type: 'ws', addr, err }, 'Error in connection from %s', addr));

    let cookies = cookie.parse(req.headers.cookie || '');
    let signed = cookies[this.conf.cookie.name] || '';
    let sessid = cookieSignature.unsign(signed.substr(2), this.conf.cookie.secret);
    let claim = sessid && await this.global.redis.get(sessid);

    if(claim){
        ws.claim = claim;
        this.global.redis.publish('joe:ws:connect', claim);
        this.global.redis.set('joe:ws:' + claim, 1);
        ws.ei = setInterval(/* istanbul ignore next */ () =>
            this.global.redis.expire('joe:ws:' + claim, 12), 10e3);
    }
    else
        ws.public = true;
}

module.exports = {

    buildWSServer(app){
        let wss = new WebSocket.Server({ noServer: true, path: '/events' });
        wss.on('connection', onConnect.bind(app));
        wss.healthChecker = setInterval(checkClientsHealth.bind(wss), 30000);

        wss.doClose = function(){
            clearInterval(wss.healthChecker);
            let cps = [];
            wss.clients.forEach(ws => {
                cps.push(new Promise(done => ws.on('close', done)));
                ws.terminate();
            });
            return Promise.all(cps);
        };

        app._server.on('upgrade', (req, socket, head) =>
            wss.handleUpgrade(req, socket, head, (ws, req) =>
                wss.emit('connection', ws, req)));

        return wss;
    },

    async onBackendEvent(channel, message){
        let targets = {};

        let { public: pub, target, except, members, notMembers, data,
            broadcast } = JSON.parse(message);

        [].concat(target).forEach(c => targets[c] = true);
        if(members){
            let cs = await this.redis.smembers(members);
            cs && cs.forEach(c => targets[c] = true);
        }

        [].concat(except).forEach(c => targets[c] = false);
        if(notMembers){
            let cs = await this.redis.smembers(notMembers);
            cs && cs.forEach(c => targets[c] = false);
        }

        delete targets[undefined];

        data = JSON.stringify(data);
        this.wss.clients.forEach(ws =>
            (ws.public ? pub : targets[ws.claim] || broadcast)
                && ws.send(data));
    }

}