sshivananda/ts-sqs-consumer

View on GitHub
src/message-processor/MessageProcessor.ts

Summary

Maintainability
A
1 hr
Test Coverage
A
96%
import { SQS } from 'aws-sdk';
import { v4 as uuidv4 } from 'uuid';

import { SQSOptions } from '../sqs/SQSOptions';
import { CustomSQSOptions } from '../sqs/CustomSQSOptions';
import { CustomSQS } from '../sqs/CustomSQS';
import { ReceiveMessageOptions } from '../sqs/ReceiveMessageOptions';
import { SQSMessage } from './SQSMessage';
import { ILogger } from '../logger/ILogger';

/**
 * MessageProcessor
 * Exposes implementations to handle messages that
 * signify an job
 */
export default class MessageProcessor<T extends SQSMessage> {
  // Instance of a logger object
  private readonly logger: ILogger;

  // Instance of a SQS object
  private sqsClient: SQS = new SQS();

  private readonly receiveMessageOptions: ReceiveMessageOptions;

  constructor(options: {
    logger: ILogger;
    sqsOptions: SQSOptions;
  }) {
    this.logger = options.logger;
    switch (true) {
      case (options != null
        && (options.sqsOptions as CustomSQSOptions).clientOptions != null
        && (options.sqsOptions as CustomSQS).sqsClient != null):
        throw new Error('Either custom sqs objects or sqs options should be specified - but not both.');
      case (options != null
        && options.sqsOptions != null
        && (options.sqsOptions as CustomSQS).sqsClient != null):
        this.sqsClient = (options!.sqsOptions as CustomSQS).sqsClient;
        break;
      case (options != null
        && options.sqsOptions != null
        && (options.sqsOptions as CustomSQSOptions) != null):
        this.sqsClient = new SQS({
          region: (options.sqsOptions as CustomSQSOptions).clientOptions.region,
        });
        break;
      default:
        break;
    }

    this.receiveMessageOptions = options.sqsOptions.receiveMessageOptions;
  }

  /**
   * getMessages
   * Retrieves message from sqs
   */
  public async getMessages(): Promise<T[] | void> {
    const queryOutput: AWS.SQS.Types.ReceiveMessageResult | void = await this
      .sqsClient
      .receiveMessage({
        QueueUrl: this.receiveMessageOptions.queueUrl,
        VisibilityTimeout: this.receiveMessageOptions.visibilityTimeout,
        WaitTimeSeconds: this.receiveMessageOptions.waitTimeSeconds,
        MaxNumberOfMessages: this.receiveMessageOptions.maxNumberOfMessages,
      })
      .promise()
      .catch((err: Error): void => {
        throw err;
      });
    return this.getValidMessages({
      queryOutput: queryOutput,
    });
  }

  /**
   * markMessageAsProcessed
   * Marks a message as processed
   */
  public async markMessagesAsProcessed(options: {
    messages: T[];
  }): Promise<boolean> {
    const deleteMessageRequest: AWS.SQS.DeleteMessageBatchRequest = {
      QueueUrl: this.receiveMessageOptions.queueUrl,
      Entries: [],
    };
    for (const message of options.messages) {
      deleteMessageRequest.Entries.push({
        Id: uuidv4(),
        ReceiptHandle: message.handle,
      });
    }
    await this
      .sqsClient
      .deleteMessageBatch(deleteMessageRequest)
      .promise()
      .catch((err: Error): void => {
        throw err;
      });

    return true;
  }

  /**
   * Given the output of sqs receive message
   * returns the list of valid messages
   * @param options.queryOutput Output of aws.sqs.receiveMessage
   */
  private async getValidMessages(options: {
    queryOutput: AWS.SQS.Types.ReceiveMessageResult | void;
  }): Promise<T[]> {
    const validMessages: T[] = [];
    if (options.queryOutput !== undefined
      && options.queryOutput.Messages !== undefined) {
      for (const sqsMessage of options.queryOutput.Messages) {
        // Adding a try catch so that a single bad message does not halt processing
        try {
          if (sqsMessage.Body !== undefined) {
            const sqsMessageData: T = <T> JSON.parse(sqsMessage.Body);
            validMessages.push({
              ...sqsMessageData,
              handle: sqsMessage.ReceiptHandle,
            });
          }
        } catch (err) {
          this.logger.log(err);
        }
      }
    } else {
      this.logger.log('No valid message found');
    }

    return validMessages;
  }
}