Enterprise-CMCS/macpro-appian-connector

View on GitHub
src/libs/topics-lib.js

Summary

Maintainability
B
5 hrs
Test Coverage
const _ = require("lodash");
import { Kafka, ResourceTypes } from "kafkajs";

export async function createTopics(brokerString, topicsConfig) {
  const topics = topicsConfig;
  const brokers = brokerString.split(",");

  const kafka = new Kafka({
    clientId: "admin",
    brokers: brokers,
    ssl: true,
  });
  var admin = kafka.admin();

  const create = async () => {
    await admin.connect();

    //fetch topics from MSK and filter out __ internal management topic
    const existingTopicList = _.filter(await admin.listTopics(), function (n) {
      return !n.startsWith("_");
    });

    console.log("Existing topics:", JSON.stringify(existingTopicList, null, 2));

    //fetch the metadata for the topics in MSK
    const topicsMetadata = _.get(
      await admin.fetchTopicMetadata({ topics: existingTopicList }),
      "topics",
      {}
    );
    console.log("Topics Metadata:", JSON.stringify(topicsMetadata, null, 2));

    //diff the existing topics array with the topic configuration collection
    const topicsToCreate = _.differenceWith(
      topics,
      existingTopicList,
      (topicConfig, topic) => _.get(topicConfig, "topic") == topic
    );

    //find interestion of topics metadata collection with topic configuration collection
    //where partition count of topic in Kafka is less than what is specified in the topic configuration collection
    //...can't remove partitions, only add them
    const topicsToUpdate = _.intersectionWith(
      topics,
      topicsMetadata,
      (topicConfig, topicMetadata) =>
        _.get(topicConfig, "topic") == _.get(topicMetadata, "name") &&
        _.get(topicConfig, "numPartitions") >
          _.get(topicMetadata, "partitions", []).length
    );

    //create a collection to update topic paritioning
    const paritionConfig = _.map(topicsToUpdate, function (topic) {
      return {
        topic: _.get(topic, "topic"),
        count: _.get(topic, "numPartitions"),
      };
    });

    //create a collection to allow querying of topic configuration
    const configOptions = _.map(topicsMetadata, function (topic) {
      return {
        name: _.get(topic, "name"),
        type: _.get(ResourceTypes, "TOPIC"),
      };
    });

    //query topic configuration
    const configs =
      configOptions.length != 0
        ? await admin.describeConfigs({ resources: configOptions })
        : [];

    console.log("Topics to Create:", JSON.stringify(topicsToCreate, null, 2));
    console.log("Topics to Update:", JSON.stringify(topicsToUpdate, null, 2));
    console.log(
      "Partitions to Update:",
      JSON.stringify(paritionConfig, null, 2)
    );
    console.log(
      "Topic configuration options:",
      JSON.stringify(configs, null, 2)
    );

    //create topics that don't exist in MSK
    await admin.createTopics({ topics: topicsToCreate });

    //if any topics have less partitions in MSK than in the configuration, add those partitions
    paritionConfig.length > 0 &&
      (await admin.createPartitions({ topicPartitions: paritionConfig }));

    await admin.disconnect();
  };

  await create();
}

export async function deleteTopics(brokerString, topicList) {
  // Check that each topic in the list is something we can delete
  for (var topic of topicList) {
    if (!topic.match(/.*--.*--.*--.*/g)) {
      throw "ERROR:  The deleteTopics function only operates against topics that match /.*--.*--.*--.*/g";
    }
  }

  const brokers = brokerString.split(",");

  const kafka = new Kafka({
    clientId: "admin",
    brokers: brokers,
    ssl: true,
    requestTimeout: 295000, // 5s short of the lambda function's timeout
  });
  var admin = kafka.admin();

  await admin.connect();

  const currentTopics = await admin.listTopics();

  let topicsToDelete = _.filter(currentTopics, function (currentTopic) {
    return topicList.some((pattern) => {
      return !!currentTopic.match(pattern);
    });
  });

  console.log(`Deleting topics:  ${topicsToDelete}`);
  await admin.deleteTopics({
    topics: topicsToDelete,
    timeout: 295000,
  });

  await admin.disconnect();
}