microfleet/core

View on GitHub
packages/plugin-kafka/src/custom/consumer-stream.ts

Summary

Maintainability
D
2 days
Test Coverage
import { Readable } from 'stream'
import { strict as assert } from 'assert'
import { once } from 'events'
import { uniqWith, isEqual } from 'lodash'
import { promisify, delay } from 'bluebird'
import type { Logger } from '@microfleet/plugin-logger'
import type { ConsumerStreamOptions } from '@microfleet/plugin-kafka-types'
import { KafkaConsumer, Message, LibrdKafkaError } from './rdkafka-extra'
import { OffsetCommitError, CriticalErrors, RetryableErrors, UncommittedOffsetsError, Generic /*, CommitTimeoutError*/ } from './errors'
import { TopicPartitionOffset, SubscribeTopicList, Assignment, EofEvent } from '@makeomatic/node-rdkafka'

export type CommitOffsetTracker = Map<string, TopicPartitionOffset>
export const EVENT_OFFSET_COMMIT_ERROR = 'offset.commit.error'

const isTopicPartitionOffset = (obj: any): obj is TopicPartitionOffset => {
  return obj !== null && typeof obj === 'object' && Object.prototype.hasOwnProperty.call(obj, 'offset')
}

export const UNKNOWN_OFFSET = -1001
export type OffsetCommitErrorHandler = (err: OffsetCommitError) => boolean

type CommitOffsetTrackerObject = Record<string, TopicPartitionOffset>

export type TrackerMeta = {
  offsetTracker: CommitOffsetTrackerObject,
  unacknowledgedTracker: CommitOffsetTrackerObject,
}

/**
 * Helps to read data from Kafka topic.
 * Allows to track consumer offset position and exit on EOF
 * Replaces `node-rdkafka/ConsumerStream`
 */
export class KafkaConsumerStream extends Readable {
  private static trackingKey(topicPart: Assignment): string {
    return `${topicPart.topic}_${topicPart.partition}`
  }

  public consumer: KafkaConsumer
  private config: ConsumerStreamOptions
  private fetchSize: number
  private offsetQueryTimeout: number

  private offsetTracker: CommitOffsetTracker
  private unacknowledgedTracker: CommitOffsetTracker
  private partitionEofs: CommitOffsetTracker

  private log?: Logger
  private endEmitted: boolean

  private topics: SubscribeTopicList
  private messages: (Message | Message[])[]
  private autoStore: boolean
  private readStarted: boolean
  private hasError: boolean
  private externalOffsetCommitErrorHandler: OffsetCommitErrorHandler

  /**
   * @param consumer Connected kafka consumer
   * @param config Topic configuration
   */
  constructor(consumer: KafkaConsumer, config: ConsumerStreamOptions, log?: Logger) {
    assert(consumer.isConnected(), 'consumer should be connected')
    assert(consumer instanceof KafkaConsumer, 'should be intance of KafkaConsumer')
    const fetchSize = config.fetchSize || 1
    const highWaterMark = config.streamAsBatch ? 1 : fetchSize

    super({ highWaterMark, objectMode: true, emitClose: true })
    this.config = config
    this.readStarted = false
    this.endEmitted = false
    this.hasError = false
    this.fetchSize = fetchSize

    this.offsetQueryTimeout = config.offsetQueryTimeout || 200

    this.offsetTracker = new Map()
    this.unacknowledgedTracker = new Map()
    this.partitionEofs = new Map()

    this.consumer = consumer
    this.autoStore = config.autoOffsetStore !== false

    this.messages = []
    this.closeAsync = promisify(this.close, { context: this })

    this.handleRebalance = this.handleRebalance.bind(this)
    this.handleOffsetCommit = this.handleOffsetCommit.bind(this)
    this.externalOffsetCommitErrorHandler = () => true
    this.handleDisconnected = this.handleDisconnected.bind(this)
    this.handleUnsubscribed = this.handleUnsubscribed.bind(this)
    this.handleEOF = this.handleEOF.bind(this)

    this.consumer.on('rebalance', this.handleRebalance)
    this.consumer.on('offset.commit', this.handleOffsetCommit)
    this.consumer.on('disconnected', this.handleDisconnected)
    this.consumer.on('partition.eof', this.handleEOF)

    // we should start graceful shutdown on unsubscribe
    // otherwise stream could misbehave
    this.consumer.on('unsubscribed', this.handleUnsubscribed)

    this.topics = Array.isArray(config.topics) ? config.topics : [config.topics]

    // remap names for bettter logging
    if (log) this.log = log.child({ topics: this.topics.map(x => String(x)) })

    // to avoid some race conditions in rebalance during parallel consumer connection
    // we should start subscription earlier than read started
    this.consumer.subscribe(this.topics)

    this.once('end', () => {
      this.endEmitted = true
    })
  }

  /**
   * Commit locally stored or provided offsets
   * @param offsets Offsets to commit
   */
  public commit(offsets: TopicPartitionOffset[] = [...this.unacknowledgedTracker.values()]): void {
    this.consumer.commit(offsets)
  }

  /**
   * Waits for commits to come through
   * @param offsets
   */
  public async commitMessages(messages: Message[]): Promise<void> {
    const offsets: TopicPartitionOffset[] = messages.map(m => ({
      topic: m.topic,
      partition: m.partition,
      offset: m.offset + 1,
    }))

    this.commit(offsets)
    this.log?.debug({ offsets }, 'commitAsync: pre')
    while (offsets.length > 0 && ! this.consumerDisconnected()) {
      const [err, positions] = await once(this.consumer, 'offset.commit') as [LibrdKafkaError | null, TopicPartitionOffset[]]
      this.log?.debug({ positions }, 'commitAsync: received')
      if (err) {
        if (!RetryableErrors.includes(err.code)) {
          throw err
        }
        return
      }

      for (const position of positions) {
        const commitedOffset = offsets.findIndex(x => x.offset === position.offset)
        if (commitedOffset !== -1) {
          offsets.splice(commitedOffset, 1)
        }
      }
    }
  }

  /**
   * Allows to setup custom offset commit error handler. If function returns true - Stream will throw and exit
   * @param handler Handler that executed when offset commit error received
   */
  public setOnCommitErrorHandler(handler: OffsetCommitErrorHandler): void {
    this.externalOffsetCommitErrorHandler = handler
  }

  public get trackerMeta(): TrackerMeta {
    return {
      offsetTracker: Object.fromEntries(this.offsetTracker),
      unacknowledgedTracker: Object.fromEntries(this.unacknowledgedTracker),
    }
  }

  public _read(): void {
    this.log?.debug('read')

    if (this.messages.length > 0) {
      const message = this.messages.shift()
      this.push(message)
      return
    }

    if (!this.readStarted) {
      this.readStarted = true
      this.readLoop()
      return
    }
  }

  public _destroy(err: Error | null | undefined, callback?: (err: Error | null) => void): void {
    const next = () => {
      if (callback) callback(err || null)
    }

    this.log?.debug({ err, disconnected: this.consumerDisconnected() }, '____destroy')
    if (this.consumerDisconnected()) {
      next()
      return
    }

    this.consumer.disconnect(next)
  }

  public close(cb?: (err?: Error | null, result?: any) => void): void {
    if (this.endEmitted || this.consumerDisconnected()) {
      if (cb) cb()
    }
    this.consumer.disconnect(cb)
  }

  private inDestroyingState(): boolean {
    return this.consumerDisconnected()
  }

  private consumerDisconnected(): boolean {
    return this.consumer._isDisconnecting || !this.consumer.isConnected()
  }

  private handleUnsubscribed(): void {
    this.log?.debug('unsubscribed from topics - quitting')
    this.close()
  }

  private async handleEOF(eof: EofEvent): Promise<void> {
    // so that we can ensure that this is an eof event
    eof.eof = true

    this.log?.info({ eof }, 'eof event')
    await this.updatePartitionEof(eof, this.partitionEofs)
  }

  private async handleOffsetCommit(err: LibrdKafkaError | null | undefined , partitions: TopicPartitionOffset[]): Promise<void> {
    if (err) {
      const wrappedError = new OffsetCommitError(partitions, this.trackerMeta, err)

      this.log?.warn({ err: wrappedError }, '[commit] offset commit error')

      if (RetryableErrors.includes(err.code) && !this.consumerDisconnected()) {
        this.log?.info({ err, partitions }, '[commit] retry offset commit')
        this.consumer.commit(partitions)
        return
      }

      const isCritical = CriticalErrors.includes(err.code)
      const handlerResult = this.externalOffsetCommitErrorHandler(wrappedError)

      if (handlerResult || isCritical) {
        this.log?.error({ err: wrappedError }, '[commit] critical commit error')
        this.destroy(wrappedError)
        return
      }
    }

    if (this.endEmitted) {
      this.log?.debug('end emitted, not handling')
      return
    }

    if (!err) {
      this.log?.debug({ partitions }, 'handle offset.commit')
      this.updatePartitionOffsets(partitions, this.offsetTracker)
    }

    if (!this.hasOutstandingAcks()) {
      await this.checkEof()
    }
  }

  private handleDisconnected(): void {
    this.log?.info('handle disconnected')

    if (this.hasOutstandingAcks()) {
      this.destroy(
        new UncommittedOffsetsError(
          Object.fromEntries(this.offsetTracker.entries()),
          Object.fromEntries(this.unacknowledgedTracker.entries())
        )
      )
      return
    }

    if (!this.endEmitted && !this.hasError) {
      this.log?.debug('pushing end from handleDisconnected()')
      this.push(null)
    }

    this.destroy()
  }

  private async handleRebalance(err: LibrdKafkaError, assignments: Assignment[] = []) {
    this.log?.debug({ err, assignments }, 'rebalance')

    switch (err.code) {
      case Generic.ERR__ASSIGN_PARTITIONS:
        try {
          // eslint-disable-next-line no-case-declarations
          const committedOffsets = await this.consumer.committedAsync(assignments, this.offsetQueryTimeout)
          this.updatePartitionOffsets(committedOffsets, this.offsetTracker)
          this.consumer.assign(committedOffsets)
        } catch (assignError: any) {
          this.log?.error(assignError, 'ConsumerStream assign partition handler error')
          this.consumer.unassign()
          this.destroy(assignError)
        }
        break

      case Generic.ERR__REVOKE_PARTITIONS:
        if (!this.consumerDisconnected()) {
          this.cleanPartitionOffsets(assignments, this.offsetTracker)
          this.cleanPartitionOffsets(assignments, this.unacknowledgedTracker)
          this.cleanPartitionOffsets(assignments, this.partitionEofs)
        }
        this.consumer.unassign()
        break

      default:
        this.emit('rebalance.error', err)
        return
    }
  }

  private handleIncomingMessages(messages: Message[]): void {
    const { unacknowledgedTracker, autoStore } = this

    if (!this.consumerDisconnected()) {
      this.consumer.pause(this.consumer.assignments())
    }

    // Avoid duplicates. Sometimes node-rdkafka returns duplicate messages
    const uniqMessages = uniqWith(messages, isEqual)

    // Filter messages with offset lower than stored offset
    const exceptPreviousOffset = uniqMessages.filter((message) => {
      const trackingKey = KafkaConsumerStream.trackingKey(message)
      const storedOffset = this.unacknowledgedTracker.get(trackingKey)?.offset || UNKNOWN_OFFSET
      return storedOffset <= message.offset
    })

    if (uniqMessages.length != messages.length) {
      this.log?.warn({ uniqLength: uniqMessages.length, origLength: messages.length }, '[dup] Duplicates received')
    }

    if (exceptPreviousOffset.length != uniqMessages.length) {
      this.log?.warn({ filtered: exceptPreviousOffset.length, uniqMessages: uniqMessages.length }, '[dup] Previous offset data received')
    }

    for (const message of uniqMessages) {
      const topicPartition: TopicPartitionOffset = {
        topic: message.topic,
        partition: message.partition,
        offset: message.offset + 1,
      }

      this.updatePartitionOffsets([topicPartition], unacknowledgedTracker)
    }

    this.log?.debug({ autoStore, offsets: [...unacknowledgedTracker.values()] }, 'Before offset store')

    // We already have all max offsets inside unacknowledgedTracker. Let's mark them for commit
    if (autoStore && !this.consumerDisconnected()) this.consumer.offsetsStore([...unacknowledgedTracker.values()])

    if (this.config.streamAsBatch) {
      this.messages.push(exceptPreviousOffset)
    } else {
      this.messages.push(...exceptPreviousOffset)
    }

    // transfer messages from local buffer to the stream buffer
    this._read()
  }

  // we must loop forever
  private async readLoop(): Promise<void> {
    while (!this.consumerDisconnected() && !this.endEmitted) {
      // when consumer disconnecting it throws Error: KafkaConsumer is not connected
      const bufferAvailable = this.readableHighWaterMark - this.readableLength
      const fetchSize = this.config.streamAsBatch ? this.fetchSize : bufferAvailable

      try {
        this.log?.debug('read loop messages queued')
        const messages = await this.consumer.consumeAsync(fetchSize)
        this.log?.debug('read loop messages returned: %d', messages.length)

        if (messages.length > 0) this.handleIncomingMessages(messages)
        if (this.config.waitInterval) await delay(this.config.waitInterval)
      } catch (err: any) {
        this.log?.error({ err }, 'consume error')

        if (err.code === Generic.ERR_UNKNOWN_TOPIC_OR_PART) {
          this.log?.warn({ err }, 'topic doesnt exist - closing')
          this.close()
        } else if (err.code !== Generic.ERR_UNKNOWN) {
          // We can receive Broker transport error with code -1
          // It's repeatable error
          this.destroy(err)
          return
        }
      }
    }

    this.readStarted = false
  }

  private async checkEof(): Promise<void> {
    if (this.inDestroyingState() || !this.config.stopOnPartitionsEOF) {
      this.log?.debug('checkEof: destroying')
      return
    }

    // we must wrap all asynchronous operations
    // because consumer state could be changed while we are waiting for promises
    try {
      let eofReached = true
      for (const val of this.offsetTracker.values()) {
        if (!val.eof) {
          eofReached = false
          break
        }
      }

      if (eofReached) {
        this.log?.debug('eof reached')
        await this.closeAsync()
        return
      }
    } catch (err: any) {
      this.log?.error({ err }, 'check eof error')

      if (err.code !== Generic.ERR__STATE || !this.inDestroyingState()) {
        this.destroy(err)
        return
      }
    }

    if (!this.inDestroyingState()) {
      this.consumer.resume(this.consumer.assignments())
    }
  }

  /**
   * Determines whether we have outstanding acknowledgements to be written
   * to the broker
   */
  private hasOutstandingAcks(): boolean {
    const { offsetTracker, unacknowledgedTracker } = this

    for (const partition of offsetTracker.values()) {
      const trackerKey = KafkaConsumerStream.trackingKey(partition)
      const latestMessage = unacknowledgedTracker.get(trackerKey)
      if (!latestMessage) {
        continue
      }

      // 1. if we already have offsets in the latest message - need to wait for accks
      // 2. latest message offset must be smaller, or we need to wait for acks
      if (latestMessage.offset > partition.offset) {
        this.log?.debug(this.trackerMeta, 'hasOutstandingAcks: true')
        return true
      }
    }

    this.log?.debug(this.trackerMeta, 'hasOutstandingAcks: false')
    return false
  }

  private async updatePartitionEof(topicPartition: TopicPartitionOffset, map: CommitOffsetTracker): Promise<void> {
      const trackingKey = KafkaConsumerStream.trackingKey(topicPartition)

      map.set(trackingKey, topicPartition)

      const currentOffsetData = this.offsetTracker.get(trackingKey)
      const currentOffset = currentOffsetData?.offset || UNKNOWN_OFFSET

      this.log?.debug({ currentOffsetData, topicPartition }, 'eof check')

      if (
        currentOffsetData && (
          currentOffset === topicPartition.offset || currentOffset === UNKNOWN_OFFSET
        )
      ) {
        currentOffsetData.eof = true
      }

      const reachedPossibleEOF = topicPartition.offset === 0 || currentOffset !== UNKNOWN_OFFSET

      if (reachedPossibleEOF && !this.hasOutstandingAcks()) {
        await this.checkEof()
      }
  }

  private updatePartitionOffsets(partitions: Assignment[], map: CommitOffsetTracker): void {
    for (const topicPartition of partitions) {
      const trackingKey = KafkaConsumerStream.trackingKey(topicPartition)

      // if it has offset - verify that the current offset is smaller
      if (isTopicPartitionOffset(topicPartition)) {
        const currentOffsetData = map.get(trackingKey)
        const currentOffset = currentOffsetData?.offset || UNKNOWN_OFFSET

        if (currentOffset <= topicPartition.offset || !currentOffsetData) {
          map.set(KafkaConsumerStream.trackingKey(topicPartition), topicPartition)
        }
      // if it has no offset - means its a new assignment, set offset to -1001
      } else if (!map.has(trackingKey)) {
        map.set(KafkaConsumerStream.trackingKey(topicPartition), {
          topic: topicPartition.topic,
          partition: topicPartition.partition,
          offset: UNKNOWN_OFFSET
        })
      }
    }
  }

  private cleanPartitionOffsets(partitions: Assignment[], map: CommitOffsetTracker): void {
    for (const topicPartition of partitions) {
      map.delete(KafkaConsumerStream.trackingKey(topicPartition))
    }
  }
}

export interface KafkaConsumerStream extends Readable {
  closeAsync(): PromiseLike<void>;
}