Toollabs/video2commons

View on GitHub
video2commons-socketio/index.js

Summary

Maintainability
A
25 mins
Test Coverage
/* eslint-env node */
/* eslint no-console: 0 */

/* ===== Globals Declaration ===== */

var express = require( 'express' ),
    http = require( 'http' ),
    io = require( 'socket.io' )(),
    redis = require( 'redis' ),
    request = require( 'request' );

var port = parseInt( process.env.PORT, 10 ),
    config = require( '../config.json' );

var app = express(),
    redisparams = {
        host: config.redis_host,
        password: config.redis_pw,
        db: 3
    };

var redisconnection = redis.createClient( redisparams ),
    redissubscription = redis.createClient( redisparams );

/* ===== HTTP / Socket.io Request Handling ===== */

app.all( '*', function ( req, res /* , next */ ) {
    res.redirect( config.webfrontend_uri );
} );

io.on( 'connection', function ( socket ) {
    console.log( '[' + new Date() + '] Connected: ' + socket.id );
    socket.on( 'auth', function ( data ) {
        redisconnection.get( 'iosession:' + data.iosession, function ( err, sessionkey ) {
            if ( sessionkey === null ) {
                socket.disconnect();
                return;
            }
            redisconnection.get( 'session:' + sessionkey, function ( err, sessiondata ) {
                if ( sessiondata === null ) {
                    socket.disconnect();
                    return;
                }

                var session = JSON.parse( sessiondata );
                // eslint-disable-next-line no-underscore-dangle
                if ( data._csrf_token !== session._csrf_token ) {
                    socket.disconnect();
                    return;
                }

                var j = request.jar();
                var cookie = request.cookie( 'v2c-session=' + sessionkey );
                var url = 'https:' + config.webfrontend_uri + 'api/status';
                j.setCookie( cookie, url );
                request( {
                    url: url,
                    jar: j,
                    headers: {
                        'User-Agent': 'video2commons-socketio'
                        // 'X-V2C-Session-Bypass': config.session_key
                    }
                }, function ( error, response, body ) {
                    if ( error || response.statusCode !== 200 ) {
                        socket.disconnect();
                        return;
                    }
                    var status = JSON.parse( body );

                    socket.emit( 'status', status );
                    socket.join( status.rooms );
                } );
            } );
        } );
    } );

    socket.on( 'disconnect', function () {
        console.log( '[' + new Date() + '] Disconnected: ' + socket.id );
    } );
} );

io.listen( http.createServer( app ).listen( port ) );

/* ===== Socket.io Client Notification ===== */

var forEachSocketInRoom = function ( room, cb ) {
    var ns = io.in( room );
    ns.clients( function ( error, clients ) {
        if ( error ) {
            throw error;
        }
        clients.forEach( function ( clientId ) {
            cb( ns.connected[ clientId ] );
        } );
    } );
};

var addtask = function ( taskid, user ) {
        forEachSocketInRoom( 'tasks:' + user, function ( socket ) { socket.join( taskid ); } );
        forEachSocketInRoom( 'alltasks', function ( socket ) { socket.join( taskid ); } );
    },
    updatetask = function ( taskid, data ) {
        if ( data ) {
            io.in( taskid ).emit( 'update', taskid, data );
        } else {
            // Do nothing if room is empty
            io.in( taskid ).clients( function ( error, clients ) {
                if ( error ) {
                    throw error;
                } else if ( clients ) {
                    request( {
                        url: 'https:' + config.webfrontend_uri + 'api/status-single?task=' + taskid,
                        headers: {
                            'User-Agent': 'video2commons-socketio',
                            'X-V2C-Session-Bypass': config.session_key
                        }
                    }, function ( error, response, body ) {
                        if ( error || response.statusCode !== 200 ) {
                            console.log( 'failed status fetching for ' + taskid );
                            return;
                        }
                        var data = JSON.parse( body );
                        io.in( taskid ).emit( 'update', taskid, data.value );
                    } );
                }
            } );
        }
    },
    removetask = function ( taskid ) {
        io.in( taskid ).emit( 'remove', taskid );
        forEachSocketInRoom( taskid, function ( socket ) { socket.leave( taskid ); } );
    };

redisconnection.on( 'error', function ( err ) {
    console.log( 'Error: ' + err );
} );

/* ===== Redis Subscription ===== */

redissubscription.psubscribe( '__keyspace@?__:*' );
redissubscription.on( 'pmessage', function ( pattern, channel, message ) {
    if ( pattern !== '__keyspace@?__:*' ) {
        return;
    }
    var match = /^__keyspace@(\d)__:(.+)$/.exec( channel ),
        db = match[ 1 ],
        key = match[ 2 ],
        action = message;

    if ( db === '1' && key.startsWith( 'celery-task-meta-' ) ) {
        var id = key.slice( 'celery-task-meta-'.length );
        if ( action === 'set' ) {
            updatetask( id );
        } else if ( action === 'expired' ) {
            removetask( id );
        }
    }
} );

redissubscription.psubscribe( 'v2cnotif:*' );
redissubscription.on( 'pmessage', function ( pattern, channel, message ) {
    if ( pattern !== 'v2cnotif:*' ) {
        return;
    }
    var type = channel.slice( 'v2cnotif:'.length ),
        data = JSON.parse( message );
    if ( type === 'add' ) {
        addtask( data.taskid, data.user );
    } else if ( type === 'update' ) {
        updatetask( data.taskid, data.data );
    } else if ( type === 'remove' ) {
        removetask( data.taskid );
    }
} );