
View on GitHub


2 days
Test Coverage
'use strict';

var path = require('path'),
    q = require('q'),
    nodeUtil = require('util'),
    _ = require('lodash'),
    ObjectID = require('mongoose').Types.ObjectId,

    deps = require(path.resolve('./src/server/dependencies.js')),
    config = deps.config,
    logger = deps.logger,

    KafkaConsumer = require(path.resolve('./src/server/lib/kafka-consumer.js')),
    BaseSocket = require(path.resolve('./src/server/app/util/sockets/base.server.socket.js'));

// If this is not null, ignore any messages that are older than this number of seconds.
var ignoreOlderThan = config.socketio.ignoreOlderThan;

 * Kafka Socket that has some default consumer message and connection handlers.
 * This is an abstract class and should be overridden by each module that requires it.
 * @abstract
function KafkaSocket(socketConfig) {
     * @type {{string}: {KafkaConsumer}}
     * Cache of connections by Kafka topics
    this._connections = {};

    this._emitRateMs = socketConfig.emitRateMs < 0 ? 0 : (+socketConfig.emitRateMs || 0);
    BaseSocket.apply(this, arguments);

    // Create a single groupId for each socket that will be shared by all consumers on this websocket
    var socket = this.getSocket();
    socket.kafkaGroupId = socket.kafkaGroupId || 'nodejs-' + new ObjectID();

        BaseSocket); = 'KafkaSocket';

 * Returns the group ID associated with the socket. This can be shared by all consumers registered
 * by the socket, since they will each be listening to different topics.
 * @returns {string}
KafkaSocket.prototype.getGroupId = function() {
    return this.getSocket().kafkaGroupId;

 * Handle socket disconnects by unsubscribing from all Kafka topics.
KafkaSocket.prototype.disconnect = function() {'%s: Disconnected from client.',;

 * Handle socket errors by unsubscribing from all Kafka topics.
KafkaSocket.prototype.error = function(err) {
    logger.error(err, '%s: Client connection error',;

 * Returns the key by which payloads should be identified when sent to the client.
 * This will be added to the payload sent to the client, and can be used to differentiate
 * between multiple connections that use the same class.
 * @param {Object} rawMessage The raw Kafka payload that was received.
 * @param {Object} json The Kafka payload, parsed as JSON.
 * @param {KafkaConsumer} consumer The KafkaConsumer instance that received the message.
 * @returns {string} The key to attach to the payload when sending to the client.
 *   If a key to distinguish between instances is not relevant, this can return null.
KafkaSocket.prototype.getEmitMessageKey = function(rawMessage, json, consumer) {
    return null;

 * Returns the name of the socket event that will be transmitted to the client.
 * This should be overridden by each implementing class.
 * @returns {string} The event name to transmit through the socket for each payload.
KafkaSocket.prototype.getEmitType = function() {
    return this._emitType || 'payload';

 * Constructs a payload to transmit to the client for each Kafka message.
 * @param {Object} rawMessage The raw Kafka payload that was received.
 * @param {Object} json The Kafka payload, parsed as JSON.
 * @param {KafkaConsumer} consumer The KafkaConsumer instance that received the message.
 * @returns {{wrappedPayload: Object, key: string}} The payload to transmit to the client.
KafkaSocket.prototype.getEmitMessage = function(rawMessage, json, consumer) {
    return {
        wrappedPayload: json.wrappedPayload,
        key: this.getEmitMessageKey(rawMessage, json, consumer)

 * Extracts a timestamp from the Kafka payload, which can be used for filtering messages.
 * @param {Object} json The Kafka payload, parsed as JSON.
 * @returns {Number} Returns the timestamp of the payload as a Long.
KafkaSocket.prototype.getMessageTime = function(json) {
    // Default to extracting time from wrapped payload
    if (null != json && null != json.wrappedPayload && null != json.wrappedPayload.p) {
        var time = json.wrappedPayload.p.time;
        logger.debug('%s: Extracted message time of %d',, time);
        return time;

    if (logger.debug()) { // is debug enabled?
        logger.debug('%s: Unknown time for message: %s',, JSON.stringify(json));

    return null;

 * Filters a Kafka payload to determine whether it should be transmitted. This should be overridden by the
 * implementing class. It does not need to filter by date, as this is done automatically for all payloads.
 * @param {Object} rawMessage The raw Kafka payload that was received.
 * @param {Object} json The Kafka payload, parsed as JSON.
 * @param {KafkaConsumer} consumer The KafkaConsumer instance that received the message.
 * @return {boolean} False if the payload should be sent to the client, true if it should be ignored.
KafkaSocket.prototype.ignorePayload = function(rawMessage, json, consumer) {
    // Ignore any payloads that are too old.
    if (null != ignoreOlderThan) {
        var now =;
        var messageTime = this.getMessageTime(json);
        if (null != messageTime) {
            if (messageTime + (ignoreOlderThan * 1000) < now) {
                logger.debug('%s: Message is too old: %d is more than %d seconds older than %d',, messageTime, ignoreOlderThan, now);
                return true;
    return false;

 * Default payload handler for Socket-based Kafka listeners. Filters messages by time, then formats
 * and transmits through the socket to the client.
 * @param {KafkaConsumer} consumer The KafkaConsumer instance that received the message.
 * @param {Object} message The raw Kafka payload containing several keys.
KafkaSocket.prototype.kafkaPayloadHandler = function(consumer, message) {
    // Gracefully handle empty messages by ignoring and logging
    if (null == message) {
        logger.warn('%s: Ignoring empty message %s',, message);

    var self = this;
    logger.debug('%s: Received Kafka Message on topic %s, partition %d, offset %d: %s',, message.topic, message.partition, message.offset, message.value);
    try {
        // Unwrap the payload
        var json = JSON.parse(message.value);
        if (null != json) {

            // Ignore any payloads that don't pass the filter check.
            if (self.ignorePayload(message, json, consumer)) {
            // Create a payload to send back to the client, containing the message and metadata identifying
            // which stream it pertains to for routing on the client side.
            var msg = self.getEmitMessage(message, json, consumer);

            // The message can be either an object or a promise for an object
            q(msg).then(function(msg) {
                if (null != msg) {
                    self.emitMessage(self.getEmitType(), msg);
            }).fail(function(err) {
                if (logger.debug()) {
                    logger.debug('Ignoring payload for user %s: %s', this.getUserId(), err);
    catch (e) {
        logger.error({err: e, msg: message.value }, '%s: Error parsing payload body.',;

 * Allows child sockets to customize the way messages are emitted, for instance to provide more advanced throttling.
 * Default implementation emits to the socket as usual.
 * @param {string} emitType The emit type
 * @param {Object} msg The message to emit
KafkaSocket.prototype.emitMessage = function(emitType, msg) {
    this.getSocket().emit(emitType, msg);

 * Allows implementing classes to add listeners to a KafkaConsumer whenever it is created.
 * @param {KafkaConsumer} consumer The consumer that was created. You can get the topic from consumer.topic
KafkaSocket.prototype.onNewConsumer = function(consumer) {
    // Handle incoming connections on the consumer

    // Simple throttling is done here, if enabled
    if (this._emitRateMs > 0) {
        consumer.on('message', _.throttle(this.kafkaPayloadHandler, this._emitRateMs).bind(this, consumer));
    } else {
        consumer.on('message', this.kafkaPayloadHandler.bind(this, consumer));

 * Returns the consumer that has already been registered on a particular topic.
 * @param {string} topic The topic.
 * @returns {KafkaConsumer|null} The consumer registered for the topic, or null if no consumer has been registered.
KafkaSocket.prototype.getConsumer = function(topic) {
    // Ignore bad input data
    if (null == topic) {
        return null;

    // If our connection cache has a stream to this topic already, return it.
    if (this._connections.hasOwnProperty(topic)) {
        return this._connections[topic];

    return null;

 * Subscribe to a topic.  This creates a new KafkaConsumer for the topic or retrieves an existing one.
 * @param {string} topic The Kafka topic to use for message retrieval.
 * @return {KafkaConsumer} A consumer for the topic.
KafkaSocket.prototype.subscribe = function(topic) {
    // Ignore bad input data
    if (null == topic) {
        return null;

    // If our connection cache has a stream to this topic already, use it. Otherwise, create a new one.
    if (this._connections.hasOwnProperty(topic)) {
        return this._connections[topic];

    // Create a new consumer for the topic
    var consumer = new KafkaConsumer(topic, this.getGroupId());

    // Store in internal object cache
    this._connections[topic] = consumer;

    return consumer;

 * Unsubscribe from a topic.  If no topic is specified, unsubscribes from all topics consumed by this socket.
* @param {string} topic The Kafka topic to unsubscribe from (optional).
KafkaSocket.prototype.unsubscribe = function(topic) {
    var self = this;

    // Unsubscribe from only the specified topic
    if (null != topic) {
        if (null != this._connections[topic]) {
            logger.debug('%s: Closing connection to topic: %s',, topic);
            delete this._connections[topic];
    // Unsubscribe from ALL topics
    else {
        logger.debug('%s: Closing connection to all topics',;
        _.forOwn(this._connections, function(connection, topic) {
            logger.debug('%s: Closing connection to topic: %s',, topic);
        this._connections = {};

module.exports = KafkaSocket;