Asymmetrik/mean2-starter

View on GitHub
src/server/app/util/sockets/event.server.socket.js

Summary

Maintainability
D
1 day
Test Coverage
'use strict';

let path = require('path'),
    q = require('q'),
    nodeUtil = require('util'),
    _ = require('lodash'),
    eventEmitter = require(path.resolve('./src/server/app/util/services/event-emitter.service.js')),
    deps = require(path.resolve('./src/server/dependencies.js')),
    config = deps.config,
    logger = deps.logger,
    BaseSocket = require(path.resolve('./src/server/app/util/sockets/base.server.socket.js'));

// If this is not null, ignore any messages that are older than this number of seconds.
let ignoreOlderThan = config.socketio.ignoreOlderThan;

function EventSocket(socketConfig) {
    this._emitRateMs = socketConfig.emitRateMs < 0 ? 0 : (+socketConfig.emitRateMs || 0);
    BaseSocket.apply(this, arguments);
}

nodeUtil.inherits(
    EventSocket,
    BaseSocket);

EventSocket.prototype.name = 'EventSocket';

/**
 * Handle socket disconnects by unsubscribing from events.
 */
EventSocket.prototype.disconnect = function() {
    logger.info('%s: Disconnected from client.', this.name);
    this.unsubscribe(null);
};

/**
 * Handle socket errors by unsubscribing from events.
 */
EventSocket.prototype.error = function(err) {
    logger.error(err, '%s: Client connection error', this.name);
    this.unsubscribe(null);
};

/**
 * Returns the key by which payloads should be identified when sent to the client.
 * This will be added to the payload sent to the client, and can be used to differentiate
 * between multiple connections that use the same class.
 *
 * @param {Object} json The payload, parsed as JSON.
 * @returns {string} The key to attach to the payload when sending to the client.
 *   If a key to distinguish between instances is not relevant, this can return null.
 */
EventSocket.prototype.getEmitMessageKey = function(json) {
    return null;
};

/**
 * Returns the name of the socket event that will be transmitted to the client.
 * This should be overridden by each implementing class.
 *
 * @returns {string} The event name to transmit through the socket for each payload.
 */
EventSocket.prototype.getEmitType = function() {
    return this._emitType || 'payload';
};

/**
 * Constructs a payload to transmit to the client for each message.
 *
 * @param {Object} json The payload, parsed as JSON.
 * @returns {{wrappedPayload: Object, key: string}} The payload to transmit to the client.
 */
EventSocket.prototype.getEmitMessage = function(json) {
    return {
        wrappedPayload: json.wrappedPayload,
        key: this.getEmitMessageKey(json)
    };
};

/**
 * Extracts a timestamp from the payload, which can be used for filtering messages.
 *
 * @param {Object} json The payload, parsed as JSON.
 * @returns {Number} Returns the timestamp of the payload as a Long.
 */
EventSocket.prototype.getMessageTime = function(json) {
    // Default to extracting time from wrapped payload
    if (null != json && null != json.wrappedPayload && null != json.wrappedPayload.p) {
        var time = json.wrappedPayload.p.time;
        logger.debug('%s: Extracted message time of %d', this.name, time);
        return time;
    }

    if (logger.debug()) { // is debug enabled?
        logger.debug('%s: Unknown time for message: %s', this.name, JSON.stringify(json));
    }

    return null;
};

/**
 * Filters a payload to determine whether it should be transmitted. This should be overridden by the
 * implementing class. It does not need to filter by date, as this is done automatically for all payloads.
 *
 * @param {Object} json The payload, parsed as JSON.
 * @return {boolean} False if the payload should be sent to the client, true if it should be ignored.
 */
EventSocket.prototype.ignorePayload = function(json) {
    // Ignore any payloads that are too old.
    if (null != ignoreOlderThan) {
        var now = Date.now();
        var messageTime = this.getMessageTime(json);
        if (null != messageTime) {
            if (messageTime + (ignoreOlderThan * 1000) < now) {
                logger.debug('%s: Message is too old: %d is more than %d seconds older than %d', this.name, messageTime, ignoreOlderThan, now);
                return true;
            }
        }
    }
    return false;
};

/**
 * Allows child sockets to customize the way messages are emitted, for instance to provide more advanced throttling.
 * Default implementation emits to the socket as usual.
 *
 * @param {string} emitType The emit type
 * @param {Object} msg The message to emit
 */
EventSocket.prototype.emitMessage = function(emitType, msg) {
    this.getSocket().emit(emitType, msg);
};

/**
 * Subscribe to an event.
 *
 * @return null if eventName is not set, true if successful
 */
EventSocket.prototype.subscribe = function(eventName) {
    // Ignore bad input data
    if (null == eventName) {
        return null;
    }

    // Simple throttling is done here, if enabled

    if (this._emitRateMs > 0) {
        this.emitterFunc = _.throttle(this.socketPayloadHandler, this._emitRateMs).bind(this, eventName);
    } else {
        this.emitterFunc = this.socketPayloadHandler.bind(this, eventName);
    }
    eventEmitter.getEventEmitter().on(eventName, this.emitterFunc);
};

/**
 * Unsubscribe from a topic.  If no topic is specified, unsubscribes from all topics consumed by this socket.
 *
 * @param {string} topic The topic to unsubscribe from (optional).
 */
EventSocket.prototype.unsubscribe = function(eventName) {
    if (typeof this.emitterFunc === 'function') {
        eventEmitter.getEventEmitter().removeListener(eventName, this.emitterFunc);
    }
};

EventSocket.prototype.socketPayloadHandler = function(eventName, message) {
    // Gracefully handle empty messages by ignoring and logging
    if (null == message) {
        logger.warn('%s: Ignoring empty message %s', this.name, message);
        return;
    }

    var self = this;
    logger.debug('%s: Received Event Message for event %s', this.name, eventName);
    try {
        // Unwrap the payload
        if (null != message) {

            // Ignore any payloads that don't pass the filter check.
            if (self.ignorePayload(message)) {
                return;
            }
            // Create a payload to send back to the client, containing the message and metadata identifying
            // which stream it pertains to for routing on the client side.
            var msg = self.getEmitMessage(message);

            // The message can be either an object or a promise for an object
            q(msg).then(function(msg) {
                if (null != msg) {
                    self.emitMessage(self.getEmitType(), msg);
                }
            }).fail(function(err) {
                if (logger.debug()) {
                    logger.debug('Ignoring payload for user %s: %s', this.getUserId(), err);
                }
            });
        }
    }
    catch (e) {
        logger.error({err: e, msg: message.value }, '%s: Error parsing payload body.', this.name);
    }
};

module.exports = EventSocket;