
View on GitHub


4 hrs
Test Coverage
'use strict';

let kafka = require('kafka-node'),
    path = require('path'),
    q = require('q'),
    events = require('events'),

    config = require(path.resolve('./src/server/config.js')),
    logger = require(path.resolve('./src/server/lib/bunyan.js')).logger,
    util = require('util');

 * Generate a random group id so all of our listeners go into different consumer pools.
 * @param topic
 * @returns {string}
function createGroupId(topic) {
    return `nodejs-${topic}-${Math.random()}`;

 * Gets a new Kafka ConsumerGroup.  We need to do this for each new connection, according to Kafka best practices.
 * @returns A promise to return a ConsumerGroup.
function getConsumer(topic, groupId, extraOptions) {

    // Default options
    let options = {
        fromOffset: 'latest',
        outOfRangeOffset: 'latest'

    // If extraOptions are provided merge with default Options
    if (null != extraOptions) {
        options = Object.assign(options, extraOptions);

    let connectionOptions = {
        host: config.kafka.zookeeper,
        groupId: groupId

    // Merge connectionOptions
    options = Object.assign(options, connectionOptions);

    let consumer = new kafka.ConsumerGroup(options, topic);

    return q.resolve(consumer);

function disconnect(connection) {
    if (connection.state === 'connected') {

        connection.state = 'closing';
            .then((consumer) => {
                if (null != consumer) {
                    try {
                    catch (err) {
                        // Ignore, since we're closing the connection anyway
            .fail((err) => {
                logger.error(err, 'Kafka Consumer: error closing consumer');

                // Re-emit this error to listeners.
                connection.emit('error', err);

 * Creates a new set of environment state for a particular connection.  This connection will automatically
 * reconnect itself if there are errors.
 * @param {string} topic The topic to connection to.
 * @param {string=} groupId Optionally, a groupId to use for this connection.
 * @param {boolean=} currentOffset Optionally, use the current offset in zookeeper, otherwise use latest offset [default].
function KafkaConsumer(topic, groupId, currentOffset, extraOptions) {
    this.topic = topic;

    // The state: disconnected, connecting, connected, reconnecting, closing
    this.state = 'disconnected';

    // The deferred promise to get a consumer connection
    this.deferred = null;

    // If the consumer is reconnecting, this is a reference to the timeout
    this.timeout = null;

    // A groupId for this consumer.  If this connection is reconnected, the same groupId will be used.
    this.groupId = groupId;

    // This will be set to true once we've initialized the offsets for this groupId.
    this.offsetsInitialized = (currentOffset === true);

    // Any extraOptions we want to provide to kafka-node consumer
    this.extraOptions = extraOptions;

    // This class is an EventEmitter;

    // Listen to our own error event, to avoid throwing exceptions out to NodeJS
    this.on('error', (err) => {
        // Ignore, this error will already have been logged elsewhere

    // Immediately initiate a request to connect
util.inherits(KafkaConsumer, events.EventEmitter);

 * @type {number} The number of milliseconds to wait before reconnecting.  This can be overridden for each connection,
 *   or in the prototype for all connections, or in the config.
KafkaConsumer.prototype.retryMs = (null != config.kafka && null != config.kafka.kafkaRetryMs) ? config.kafka.kafkaRetryMs : 3000;

 * @type {number} The Kafka partition from which to get the offset.  This defaults to 0.
KafkaConsumer.prototype.partition = 0;

KafkaConsumer.prototype.isPending = function() {
    return null != this.deferred && this.deferred.promise.isPending();

 * Connect a consumer to the given topic.  If an active consumer already exists, it will be immediately
 * returned; otherwise a new consumer will be created.
 * @param topic The topic to connect to.
 * @returns A promise to return a KafkaConsumer.
KafkaConsumer.prototype.getConsumer = function() {
    let self = this;

    // Initialize the deferred promise, if we don't already have one.
    self.deferred = self.deferred || q.defer();

    // If we are closing the connection, make sure the promise is actually resolved
    if (self.state === 'closing' && self.deferred.promise.isPending()) {

    // Unless we are disconnected, simply return the existing promise.
    if (self.state !== 'disconnected') {
        return self.deferred.promise;

    // At this point, we know we are in the disconnected state.  Try initiating a connection.

    self.state = 'connecting';
    self.groupId = self.groupId || createGroupId(self.topic);

    // Try to connect, creating a new consumer each time.
    getConsumer(self.topic, self.groupId, self.extraOptions).then((consumer) => {
        // If the connection was closed in the meantime, it's possible the deferred object has been cleared.
        // If so, we don't care about the response.
        if (null != self.deferred) {
            self.state = 'connected';

            // Since each consumer has its own group Id, the offsets by default will reset to the beginning
            // of the topic since the group hasn't consumed anything yet.  Instead, we want to start at
            // the end of the topic, but do it in a partition-agnostic way.
            if (!self.offsetsInitialized) {

                // Pause the consumer until we've got the correct offsets.

                // Replace the default rebalance event listeners with our own
                let rebalanceListeners = consumer.listeners('rebalanced');

                // The first time the consumer is rebalanced, get the offsets
                consumer.once('rebalanced', () => {
                    // Get the last offset for each topic
                    let payloads = consumer.getTopicPayloads().map((topic) => {
                        return {
                            topic: topic.topic,
                            partition: Number(topic.partition),
                            time: -1, // Specify -1 to receive the latest offsets
                            maxNum: 1 // The number of values to retrieve

                    consumer.offsetRequest(payloads, (err, offsets) => {
                        if (err) {
                            consumer.emit('error', err);
                        else {

                            // Condense multiple offset values
                            for (let topic in offsets) {
                                for (let partition in offsets[topic]) {
                                    offsets[topic][partition] = Math.min.apply(null, offsets[topic][partition]);
                          `Kafka Consumer: Connected to topic ${topic}, partition: ${partition}, offset: ${offsets[topic][partition]}`);

                            // Update the offsets and commit them
                            consumer.updateOffsets(offsets, true);
                            consumer.commit(false, (err) => {
                                if (err) {
                                    consumer.emit('error', err);
                                else {
                                    self.offsetsInitialized = true;

                                    // Re-register the previous listeners
                                    rebalanceListeners.forEach((listener) => {
                                        consumer.on('rebalanced', listener);

                                    // The consumer is now ready to start reading from the topic
                                    consumer.ready = true;

            // If the consumer receives an error, try to reconnect
            consumer.on('error', (err) => {

                // If the promise still hasn't been resolved, we need to reject it
                // before we create a new one; otherwise clients listening to the old
                // promise could hang forever.
                if (self.isPending()) {

                // We only need to handle this error if we are actually connected.
                // Otherwise, we are probably trying to close the connection.
                if (self.state === 'connected') {
                    if ( === 'TopicsNotExistError') {
                        logger.warn(`Kafka Consumer: ${err.message}`);
                    else {
                        logger.error(err, 'Kafka Consumer: Connection to zookeeper failed.');

                    // Re-emit this to error to listeners.
                    self.emit('error', err);

                    // Try to reconnect

            // If the consumer receives a message, re-emit to our own event handler.
            // This allows consumers to bind to the KafkaConsumer object rather than to a Consumer
            // that may need to reconnect.
            consumer.on('message', (message) => {
                // Re-emit the message to listeners of this wrapper
                self.emit('message', message);

            // Notify listeners that a connection was made.

            // Resolve the deferred promise with the consumer.  The next time getConsumer() is called,
            // the consumer will immediately be returned.
    }).fail((err) => {
        logger.error(err, 'Kafka Consumer: error creating consumer');

        // Re-emit this error to listeners.
        self.emit('error', err);

        // There was some sort of error creating the connection, so reject the connection promise.
        // It's possible that at this point the state was lost, so check for nulls first.
        if (self.isPending()) {

        // Then try connecting again, which should create a new promise.

    return self.deferred.promise;

 * Disconnects this consumer from its topic, closing all necessary resources and canceling any pending reconnects.
KafkaConsumer.prototype.close = function() {`Kafka Consumer: Disconnecting zookeeper topic ${this.topic}`);

    // Explicitly close the connection (but only do this if we were actually connected)

    this.state = 'disconnected';

    if (this.isPending()) {
    this.deferred = null;

    // If there is a reconnect timeout, cancel it
    if (null != this.timeout) {
        this.timeout = null;

 * Reconnects to the given topic after a delay.
 * @param topic The topic to reconnect to.
KafkaConsumer.prototype.reconnect = function() {
    let self = this;

    // If a reconnection is already scheduled, don't do it again.
    if (null == self.timeout) {`Kafka Consumer: Attempting to reconnect zookeeper topic ${self.topic} in ${self.retryMs} ms`);

        // Let listeners know we are reconnecting.  They will subsequently also get 'disconnect' and 'connect' notifications.

        // Create a new deferred promise.  Subsequent calls to connect will get this promise even before the
        // reconnection has occurred.
        // Explicitly close the connection (but only do this if we were actually connected)

        self.state = 'reconnecting';

        if (self.isPending()) {
        self.deferred = q.defer();

        // Setup a timeout
        self.timeout = setTimeout(() => {
            self.timeout = null;
            self.state = 'disconnected';

            // Trigger the connection to be recreated.  This will also fulfill the promise returned from the
            // reconnect() function.
        }, self.retryMs);
    return self.deferred.promise;

// Export API
module.exports = KafkaConsumer;