ghermeto/kafka-observable

View on GitHub
lib/client/producer.js

Summary

Maintainability
A
0 mins
Test Coverage
/**
 * Kafka Producer Adapter. It adapts no-kafka producer to the format expected
 * by the KafkaObservable interface
 *
 * @example <caption>producer</caption>
 * const kafkaClient = require('./lib/client');
 * const producer = kafkaClient.producer({brokers: 'https://exemple.com:7123'});
 *
 * const message = { hello: 'world', ok: true };
 * producer.on('error', err => console.error(err));
 * producer.publish('my_topic', message);
 *
 * @author ghermeto
 **/

const Kafka = require('no-kafka');
const Base = require('./base');
const shortid = require('shortid');

/**
 * @class Producer
 */
class Producer extends Base {
    constructor(opts) {
        super(opts);
        const options = this.clientOptions();
        this.producer = new Kafka.Producer(Object.assign({}, options));
    }

    /**
     * @return {Object} client options
     */
    clientOptions() {
        const base = super.clientOptions();
        const fixed = { partitioner: this.partitioner(this.opts.partitioner) };
        return Object.assign({}, base, fixed);
    }

    /**
     * Dynamically generates producer default configuration properties
     * @private
     * @override
     * @returns {Object} Default configs
     */
    defaults() {
        const base = super.defaults();
        const defaults = { timeout: 1000 };
        return Object.assign({}, base, defaults);
    }

    /**
     * Initializes the producer if necessary or use the cached producer
     * @private
     * @return {Promise} active producer
     */
    init() {
        if (this.active) { return this.active; }
        this.active = this.producer.init();
        return this.active;
    }

    /**
     * Returns a partitioner instance from a string or from a class.
     * @private
     * @param {String|Object} nameOrClass name of the partitioner or class
     * @return {Object} partitioner instance
     */
    partitioner(nameOrClass = 'Default') {
        if (typeof nameOrClass === 'object') {
            return new nameOrClass();
        }
        const className = `${nameOrClass}Partitioner`;
        if (!Kafka[className]) {
            this.log.warn(`Partitioner ${className} not found. Using default.`);
            return new Kafka.DefaultPartitioner();
        }
        return new Kafka[className]();
    }

    /**
     * Serializes messages
     * @static
     * @private
     * @param {Object|String} message
     * @return {String} serialized message
     */
    static serialize(message) {
        return typeof message === 'string' ? message : JSON.stringify(message);
    }

    /**
     * Returns the producer client once it has sent at least one message.
     * @return {Kafka.Producer|Producer}
     */
    client() {
        return this.producer;
    }

    /**
     * Terminates the client connection.
     * @return {Promise}
     */
    end() {
        return this.client().end();
    }

    /**
     * Publishes a message to Kafka
     * @param {String} topic topic to publish on
     * @param {Object|String} message message to be sent
     * @param {String} key message key
     * @return {Promise}
     */
    publish(topic, message, key = `${this.clientId}-${shortid.generate()}`) {
        const serialized = Producer.serialize(message);
        const payload = {topic: topic, message: {key: key, value: serialized}};

        return this.init()
            .then(() => this.producer.send(payload))
            .catch(err => this.handleError(err));
    }
}

module.exports = Producer;