LD4P/sinopia_indexing_pipeline

View on GitHub
src/Listener.js

Summary

Maintainability
A
0 mins
Test Coverage
F
33%
import config from "config"
import Logger from "./Logger"
import connect from "./mongo"

export default class Listener {
  constructor() {
    this.dbName = config.get("dbName")
    this.collectionName = config.get("collectionName")
    this.logger = new Logger()
  }

  /**
   * @callback messageCallback
   * @param {string} body - Message body
   * @param {Object} headers - Message headers as key-value pairs
   */

  /**
   * Listens for messages on a queue
   * @param {messageCallback} onNewMessage - Callback that handles the message
   */
  async listen(onNewMessage) {
    return connect()
      .then(async (client) => {
        this.logger.debug(`watching ${this.dbName}.${this.collectionName}`)
        // See https://developer.mongodb.com/quickstart/nodejs-change-streams-triggers
        // AWS requires setting readPreference for collection.
        const changeStream = client
          .db(this.dbName)
          .collection(this.collectionName, { readPreference: "primary" })
          .watch({ fullDocument: "updateLookup" })

        while (await changeStream.hasNext()) {
          onNewMessage(await changeStream.next())
        }
        // changeStream.on('change', onNewMessage)
      })
      .catch((error) => this.logger.error(error))
  }
}