sshivananda/ts-sqs-consumer

View on GitHub
src/SQSConsumer.ts

Summary

Maintainability
A
1 hr
Test Coverage
A
100%
import { SQSConsumerOptions } from './SQSConsumerOptions';
import LogLevels from './logger/LogLevels';
import { ILogger } from './logger/ILogger';
import Logger from './logger/Logger';
import { SQSMessage } from './message-processor/SQSMessage';
import MessageProcessor from './message-processor/MessageProcessor';

/**
 * Contains functionality to manage different components that
 * consume and delete messages from SQS.
 */
export default class SQSConsumer<T extends SQSMessage> {
  // Instance of a logger object
  private readonly logger: ILogger;

  // max searches config - defaults to -1
  private readonly maxSearches: number = -1;

  private messageProcessor: MessageProcessor<T>;

  private jobProcessor: ((message: T) => Promise<void>);

  private stopAtError: boolean = false;

  constructor(options: SQSConsumerOptions<T>) {
    this.logger = new Logger({
      logLevel: LogLevels.debug,
    });

    this.messageProcessor = new MessageProcessor<T>({
      logger: this.logger,
      sqsOptions: options.sqsOptions,
    });
    if (options.sqsOptions.receiveMessageOptions.maxSearches != null) {
      this.maxSearches = options.sqsOptions.receiveMessageOptions.maxSearches;
    }
    this.jobProcessor = options.jobProcessorOptions.jobProcessor;
    if (options.jobProcessorOptions.stopAtError != null) {
      this.stopAtError = options.jobProcessorOptions.stopAtError;
    }
  }

  /**
   * Polls for pending jobs and processes them.
   */
  public async processPendingJobs(): Promise<number> {
    let searchCounter = 0;
    while (searchCounter !== this.maxSearches) {
      try {
        this.logger.log('Searching for sqs messages');
        const messages: T[] | void = await this
          .messageProcessor
          .getMessages()
          .catch((err: Error): void => {
            throw err;
          });
        await this
          .processAndDeleteMessages({
            messages: messages,
          })
          .catch((err: Error): void => {
            throw err;
          });
      } catch (err) {
        this.logger.log(err);
        if (this.stopAtError) {
          throw err;
        }
      } finally {
        searchCounter += 1;
      }
    }

    return searchCounter;
  }

  /**
   * Processes messages one at a time and deletes them from
   * sqs post processing
   * @param options.messages Messages to be processed and deleted from queue
   */
  private async processAndDeleteMessages(options: {
    messages: T[] | void
  }): Promise<void> {
    if (options.messages) {
      for (const message of options.messages) {
        await this
          .jobProcessor(message)
          .catch((err: Error): void => {
            throw err;
          });
      }
      await this.messageProcessor.markMessagesAsProcessed({
        messages: options.messages,
      }).catch((err: Error): void => {
        throw err;
      });
    }
  }
}