Enterprise-CMCS/macpro-mako

View on GitHub
lib/lambda/sinkChangelog.ts

Summary

Maintainability
A
2 hrs
Test Coverage
F
0%
import { Handler } from "aws-lambda";
import { decodeBase64WithUtf8 } from "shared-utils";
import { KafkaEvent, KafkaRecord, opensearch } from "shared-types";
import {
  ErrorType,
  bulkUpdateDataWrapper,
  getTopic,
  logError,
} from "../libs/sink-lib";
import { Index } from "shared-types/opensearch";
const osDomain = process.env.osDomain;
if (!osDomain) {
  throw new Error("Missing required environment variable(s)");
}
const index: Index = `${process.env.indexNamespace}changelog`;

// One notable difference between this handler and sinkMain's...
// The order in which records are processed for the changelog doesn't matter.
// Because each event is a unique record, and so there is no upserting, order doesn't matter.
export const handler: Handler<KafkaEvent> = async (event) => {
  const loggableEvent = { ...event, records: "too large to display" };
  try {
    for (const topicPartition of Object.keys(event.records)) {
      const topic = getTopic(topicPartition);
      switch (topic) {
        case undefined:
          logError({ type: ErrorType.BADTOPIC });
          throw new Error();
        case "aws.onemac.migration.cdc":
          // await legacyAdminChanges(
          //   event.records[topicPartition],
          //   topicPartition,
          // );
          // await onemac(event.records[topicPartition], topicPartition);
          await processAndIndex({
            kafkaRecords: event.records[topicPartition],
            index,
            osDomain,
            transforms: opensearch.changelog.transforms,
            topicPartition: topicPartition,
          });
          break;
      }
    }
  } catch (error) {
    logError({ type: ErrorType.UNKNOWN, metadata: { event: loggableEvent } });
    throw error;
  }
};

const processAndIndex = async ({
  kafkaRecords,
  index,
  osDomain,
  transforms,
  topicPartition,
}: {
  kafkaRecords: KafkaRecord[];
  index: Index;
  osDomain: string;
  transforms: any;
  topicPartition: string;
}) => {
  const docs: Array<(typeof transforms)[keyof typeof transforms]["Schema"]> =
    [];
  for (const kafkaRecord of kafkaRecords) {
    console.log(JSON.stringify(kafkaRecord, null, 2));
    const { value, offset } = kafkaRecord;
    try {
      // If a legacy tombstone, continue
      if (!value) {
        continue;
      }

      // Parse the kafka record's value
      const record = JSON.parse(decodeBase64WithUtf8(value));

      // If we're not a mako event, continue
      // TODO:  handle legacy.  for now, just continue
      if (!record.event || record?.origin !== "mako") {
        continue;
      }

      // If the event is a supported event, transform and push to docs array for indexing
      console.log("event below");
      console.log(record.event);

      if (record.event in transforms) {
        const transformForEvent =
          transforms[record.event as keyof typeof transforms];

        const result = transformForEvent.transform(offset).safeParse(record);

        if (result.success && result.data === undefined) continue;
        if (!result.success) {
          logError({
            type: ErrorType.VALIDATION,
            error: result?.error,
            metadata: { topicPartition, kafkaRecord, record },
          });
          continue;
        }
        console.log(JSON.stringify(result.data, null, 2));
        docs.push(result.data);
      } else {
        console.log(`No transform found for event: ${record.event}`);
      }
    } catch (error) {
      logError({
        type: ErrorType.BADPARSE,
        error,
        metadata: { topicPartition, kafkaRecord },
      });
    }
  }

  // Send all transformed records for indexing
  await bulkUpdateDataWrapper(osDomain, index, docs);
};

// const onemac = async (kafkaRecords: KafkaRecord[], topicPartition: string) => {
//   let docs: any[] = [];
//   for (const kafkaRecord of kafkaRecords) {
//     const { key, value, offset, timestamp } = kafkaRecord;
//     try {
//       // Skip delete events
//       if (!value) continue;

//       // Set id
//       const id: string = decodeBase64WithUtf8(key);

//       // Parse event data
//       const record = JSON.parse(decodeBase64WithUtf8(value));

//       // Process legacy events
//       if (record?.origin !== "mako") {
//         // Skip if it's not a submission event with a good GSIpk
//         if (
//           record?.sk === "Package" ||
//           !record.GSI1pk?.startsWith("OneMAC#submit")
//         ) {
//           continue;
//         }
//         const result = opensearch.changelog.legacyEvent
//           .transform(id)
//           .safeParse(record);

//         if (result.success && result.data === undefined) continue;

//         // Log Error and skip if transform had an error
//         if (!result?.success) {
//           logError({
//             type: ErrorType.VALIDATION,
//             error: result?.error,
//             metadata: { topicPartition, kafkaRecord, record },
//           });
//           continue;
//         }

//         // If we made it this far, we push the document to the docs array so it gets indexed
//         docs.push(result.data);
//       }

//       // Process micro events
//       if (record?.origin === "mako") {
//         // Resolve actionType
//         const actionType = record.actionType || "new-submission";

//         // Push to docs so it can be indexed, with some differences if app k
//         if (actionType === Action.UPDATE_ID) {
//           console.log("UPDATE_ID detected...");
//           await bulkUpdateDataWrapper(osDomain, index, docs);
//           docs = [];
//           const items = await os.search(osDomain, index, {
//             from: 0,
//             size: 200,
//             query: {
//               bool: {
//                 must: [{ term: { "packageId.keyword": id } }],
//               },
//             },
//           });
//           if (items === undefined || items.hits.hits === undefined) {
//             continue;
//           }
//           const modifiedHits: opensearch.changelog.Document[] =
//             items.hits.hits.map(
//               (hit: { _source: opensearch.changelog.Document }) => {
//                 return {
//                   ...hit._source,
//                   id: `${record.newId}-${hit._source.id.split("-").pop()}`,
//                   packageId: record.newId,
//                 };
//               },
//             );
//           docs.push(...modifiedHits);
//           docs.push({
//             ...record,
//             id: `${record.newId}-${offset}`,
//             packageId: record.newId,
//             oldPackageId: id,
//             newPackageId: record.newId,
//             timestamp,
//             actionType,
//           });
//           continue;
//         }
//         if (actionType === Action.REMOVE_APPK_CHILD) {
//           docs.push(
//             {
//               ...record,
//               id: `${record.appkParentId}-${offset}`,
//               packageId: record.appkParentId,
//               appkChildId: record.id,
//               timestamp,
//               actionType,
//             },
//             {
//               ...record,
//               id: `${record.id}-${offset}`,
//               appkParentId: record.appkParentId,
//               packageId: record.id,
//               timestamp,
//               actionType,
//             },
//           );
//           continue;
//         }
//         docs.push({
//           ...record,
//           id: `${id}-${offset}`,
//           packageId: id,
//           timestamp,
//           actionType,
//         });
//       }
//     } catch (error) {
//       logError({
//         type: ErrorType.BADPARSE,
//         error,
//         metadata: { topicPartition, kafkaRecord },
//       });
//     }
//   }
//   await bulkUpdateDataWrapper(osDomain, index, docs);
// };

// const legacyAdminChanges = async (
//   kafkaRecords: KafkaRecord[],
//   topicPartition: string,
// ) => {
//   const docs: any[] = [];
//   for (const kafkaRecord of kafkaRecords) {
//     const { key, value } = kafkaRecord;
//     try {
//       // Skip delete events
//       if (!value) continue;

//       // Set id
//       const id: string = decodeBase64WithUtf8(key);

//       // Parse event data
//       const record = JSON.parse(decodeBase64WithUtf8(value));

//       // Process legacy events
//       if (record?.origin !== "mako") {
//         // Skip if it's not a package view from onemac with adminChanges
//         if (
//           !(
//             record?.sk === "Package" &&
//             record.submitterName &&
//             record.adminChanges
//           )
//         ) {
//           continue;
//         }
//         for (const adminChange of record.adminChanges) {
//           const result = opensearch.changelog.legacyAdminChange
//             .transform(id)
//             .safeParse(adminChange);

//           if (result.success && result.data === undefined) continue;

//           // Log Error and skip if transform had an error
//           if (!result?.success) {
//             logError({
//               type: ErrorType.VALIDATION,
//               error: result?.error,
//               metadata: { topicPartition, kafkaRecord, record },
//             });
//             continue;
//           }

//           // If we made it this far, we push the document to the docs array so it gets indexed
//           docs.push(result.data);
//         }
//       }
//     } catch (error) {
//       logError({
//         type: ErrorType.BADPARSE,
//         error,
//         metadata: { topicPartition, kafkaRecord },
//       });
//     }
//   }
//   await bulkUpdateDataWrapper(osDomain, index, docs);
// };