lib/util/websocket/msg_src.js

Summary

Maintainability
A
35 mins
Test Coverage
'use strict';


var miloCore = require('milo-core')
    , MessageSource = miloCore.classes.MessageSource
    , _ = miloCore.proto
    , logger = miloCore.util.logger
    , uniqueId = require('../../util/unique_id')
    , config = require('../../config');


var WSMessageSource = _.createSubclass(MessageSource, 'WSMessageSource', true);


_.extendProto(WSMessageSource, {
    // implementing MessageSource interface
    addSourceSubscriber: addSourceSubscriber,
    removeSourceSubscriber: removeSourceSubscriber,
    
    // class specific methods
    handleEvent: WSMessageSource$handleEvent,
    connect: WSMessageSource$connect,
    trigger: WSMessageSource$trigger
});


module.exports = WSMessageSource;


function WSMessageSource$connect(options) {
    this._options = options = options || {};

    var host = options.host || window.location.host.replace(/:.*/, '')
        , port = options.port || '8080';

    if (this._ws) {
        // TODO should unsubscribe differently
        this._ws.onopen = this.ws.onmessage = this.ws.onclose = this.ws.onerror = undefined;
        this._ws.close();
    }

    this._ws = new WebSocket('ws://' + host + ':' + port);

    // TODO reconnect
}



function addSourceSubscriber (sourceMessage) {
    _wsSubscriberMethod.call(this, 'addEventListener', sourceMessage);
}


function removeSourceSubscriber (sourceMessage) {
    _wsSubscriberMethod.call(this, 'removeEventListener', sourceMessage);
}


function _wsSubscriberMethod (method, sourceMessage) {    
    if (!this._ws) return logger.error('websocket is not created');
    this._ws[method](sourceMessage, this);
}


// event dispatcher - as defined by Event DOM API
function WSMessageSource$handleEvent (event) {
    this.dispatchMessage(event.type, event);
}


function WSMessageSource$trigger (msg, data, callback) {
    if (!this._ws) return logger.error('websocket is not created');

    data = data || {};
    data.type = msg;

    var self = this;
    
    if (callback) {
        data.callbackCorrId = uniqueId();
        var interval = _.delay(onTimeout, config.websocket.rpc.timeout);
        toggleRpcSubscription('once', data.callbackCorrId);
    }    

    this._ws.send(JSON.stringify(data));


    function onTimeout() {
        toggleRpcSubscription('off', data.callbackCorrId);
        callback(new Error('websocket rpc: timeout'));
    }

    function onResponse(msg, msgData) {
        clearInterval(interval);
        if (typeof msgData == 'object') {
            var err = msgData.error ? new Error(msgData.error) : null;
            callback(err, msgData.data);
        } else
            callback(new Error('websocket rpc: invalid response data'), msgData);
    }

    function toggleRpcSubscription(onOff, corrId) {
        self.messenger[onOff](config.websocket.rpc.responsePrefix + corrId, onResponse);
    }
}