mimiz/rabbitmq-event-manager

View on GitHub
src/adapter/helper/consume.ts

Summary

Maintainability
A
1 hr
Test Coverage
import * as amqp from 'amqplib';
import { EventManagerError } from '../../lib/EventManagerError';
import { ConsumeHandlerFunction, IEventManagerOptions, IEventPayload } from '../../lib/interfaces';
import { LOGGER } from '../../lib/logger';

export function consume(channel: amqp.Channel, queueName: string, listener: ConsumeHandlerFunction, options: IEventManagerOptions): Promise<IEventPayload | void | null> {
  LOGGER.info(`Consume messages of queue  ${queueName}`);
  return new Promise((resolve, reject) => {
    try {
      channel.consume(queueName, message => {
        LOGGER.debug(`Message received on queue  ${queueName}`);
        if (message) {
          try {
            const payload = JSON.parse(message.content.toString());
            const key = payload._metas.guid;
            listener(payload)
              .then((response: IEventPayload | void | null) => {
                acknowledgeMessage({ message, key, channel });
                resolve(response);
              })
              .catch(err => {
                flushMessage({ message, channel, key, options });
                reject(new EventManagerError('Listener throws Error, message has been flushed', err));
              });
          } catch (e) {
            LOGGER.error(`Message unreadable, unable to parse JSON parsed on ${queueName}, flushing...`, e);
            flushMessage({
              message,
              channel,
              key: 'unable to parse key',
              options,
            });
            reject(new EventManagerError(`Error Parsing message`, e));
          }
        } else {
          reject(new EventManagerError(`Message received is null or not defined`));
        }
      });
    } catch (e) {
      reject(new EventManagerError(`Error Consuming queue.`, e));
    }
  });
}

interface IFlushMessageParams {
  message: amqp.ConsumeMessage;
  channel: amqp.Channel;
  key: string;
  options: IEventManagerOptions;
}
/**
 * Helper function to make the consume method more readable
 */
function flushMessage(params: IFlushMessageParams) {
  const { channel, message, key, options } = params;
  channel.nack(message, false, false);
  LOGGER.warn(`Message with guid ${key} has been flushed.`);
}

interface IAcknowledgeMessageParams {
  key: string;
  channel: amqp.Channel;
  message: amqp.ConsumeMessage;
}
function acknowledgeMessage(params: IAcknowledgeMessageParams) {
  const { key, channel, message } = params;
  LOGGER.debug('Message has been ackowledged');
  channel.ack(message);
}