
View on GitHub


Test Coverage
# Microfleet Kafka Plugin

Adds Kafka support to microfleet. Provides Stream like API for sending messages to Kafka broker.

For more information please read about [node-rdkafka](

## Install

`yarn add @microfleet/plugin-kafka`

## Configuration

To make use of the plugin, adjust microfleet configuration in the following way:

exports.plugins = [

exports.kafka = {
  // librdkafka configuration
  debug: 'consumer,cgrp,topic,fetch',
  '': 'kafka:9092',
  '': 'test-group',

## Interface

Microfleet Kafka Plugin extends service interface with the following methods:

### async service.kafka.createReadStream({ streamOpts, conf, topic }): Readable

Initializes Kafka Consumer stream using provided params and creates a Readable stream.
This is the reimplementation of the `node-rdkafka.ConsumerStream` stream with some addons.
Extra parameters:
const streamOpts = {
  checkTopicExists: boolean, // Check whether consumed topics exist.
  stopOnPartitionsEOF: boolean, // Stop stream when all assigned partitions read.
  offsetQueryTimeout: number, // Milliseconds Timeout for Broker requests.
  offsetCommitTimeout: number, // Milliseconds to wait for offset commits received on stream close.

### async service.kafka.createWriteStream({ streamOpts, conf, topic }): Writable

Initializes Kafka producer using provided params and creates a Writable stream.
Detailed docs here -

## Parameter description

For information about parameters passed to the interface methods:

* `streamOpts` - See [this](../plugin-kafka-types/index.d.ts) for `ConsumerStream` or [this]( for `ProducerStream`
* `conf` - See [this page](
* `topic` - See [this page](

## Example

producerStream = await service.kafka.createProducerStream({
  streamOptions: { objectMode: true, pollInterval: 10 },
  conf: {'': 'other-group'},

consumerStream = await service.kafka.createConsumerStream({
  streamOptions: { topics: topic, streamAsBatch: true, fetchSize: 10 },
  conf: {
    debug: 'consumer',
    '': false,
    '': 'someid',
    '': 'other-group',
  topic: {
    'auto.offset.reset': 'earliest', // 'earliest | latest' - earliest will start from las committed offset, latest - will start from last received message.

// and then
  value: Buffer.from(`message at ${}`),
}, cb)

// or
producerStream.write(Buffer.from(`message at ${}`), cb)

for await (const message of consumer) {
  // process message