makeomatic/mservice

View on GitHub
packages/plugin-kafka/src/util.ts

Summary

Maintainability
A
35 mins
Test Coverage
import { TopicMetadata, SubscribeTopic, SubscribeTopicList } from '@makeomatic/node-rdkafka'
import { TopicNotFoundError } from './custom/errors'
import type { Level } from 'pino'

/**
 * `librdkafka` uses syslog severity levels
 */
export const kafkaSeverityToLogMapping: { [level: number]: Level } = {
  0: 'fatal', 1: 'fatal',
  2: 'fatal', 3: 'error',
  4: 'warn', 5: 'info',
  6: 'info', 7: 'debug',
}

/**
 * Convert syslog level to generic level
 * @param level syslog level
 */
export function getLogFnName(level: number): Level {
  return kafkaSeverityToLogMapping[level] || 'debug'
}

/**
 * Checks whether topics exist
 * @param data - List of topics received on Client.connect
 * @param topics - Required topics
 */
export function topicExists(data: TopicMetadata[], topics: SubscribeTopic | SubscribeTopicList): void {
  const topicList = Array.isArray(topics) ? topics : [topics]
  for (const topic of topicList) {
    const found = data.find((metaDataTopic) => {
      if (topic instanceof RegExp) return topic.test(metaDataTopic.name)
      return topicList.includes(metaDataTopic.name)
    })

    if (!found) throw new TopicNotFoundError('Missing consumer topic', topicList)
  }
}