LD4P/sinopia_indexing_pipeline

View on GitHub
src/Pipeline.js

Summary

Maintainability
A
0 mins
Test Coverage
A
92%
import Listener from "./Listener"
import Logger from "./Logger"
import Indexer from "./Indexer"
import { replaceInKeys } from "./utilities"

export default class Pipeline {
  constructor() {
    this.listener = new Listener()
    this.logger = new Logger()
    this.indexer = new Indexer()
  }

  /**
   * Runs the pipeline
   */
  async run() {
    await this.listener.listen(async (message) => {
      this.logger.debug(`received message: ${JSON.stringify(message)}`)

      if (message.operationType === "delete")
        return this.delete(message.documentKey._id)

      if (!["insert", "replace"].includes(message.operationType)) return

      // Need to map ! back to . in keys.
      const doc = replaceInKeys(message.fullDocument, "!", ".")

      // Invoke the method for the operation type
      this[message.operationType](doc)
    })
  }

  insert(doc) {
    this.indexer.index(doc)
  }

  // Updates and creates are handled the same way since we index w/ an ID
  replace(doc) {
    this.insert(doc)
  }

  delete(mongoId) {
    this.indexer.delete(mongoId)
  }
}