mimiz/rabbitmq-event-manager

View on GitHub
README.md

Summary

Maintainability
Test Coverage
# RabbitMQ Event Manager

[![Build Status](https://travis-ci.org/mimiz/rabbitmq-event-manager.svg?branch=master)](https://travis-ci.org/mimiz/rabbitmq-event-manager) [![Maintainability](https://api.codeclimate.com/v1/badges/bfd3cd4f2f47356c09f6/maintainability)](https://codeclimate.com/github/mimiz/rabbitmq-event-manager/maintainability) [![Test Coverage](https://api.codeclimate.com/v1/badges/bfd3cd4f2f47356c09f6/test_coverage)](https://codeclimate.com/github/mimiz/rabbitmq-event-manager/test_coverage)

A Node Event Manager using RabbitMQ to exchange events.

Exchanges and Queues are automatically created.

Here is a small schema :

![RabbitMQ Schema](https://raw.githubusercontent.com/mimiz/rabbitmq-event-manager/master/doc/RABBITMQ-Schema.png)

## Install

```
npm install rabbitmq-event-manager
```

Or with Yarn

```
yarn add rabbitmq-event-manager
```

## Basic Example

- **Initialize**

```js
import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({
  url: 'amqp://localhost',
  application: 'CONSUMER'
});
myEventManager.initialize()
    .then(()=>{
        /** Do something after initialization */
    })
    .catch((err)=> {
        /** An error occured while initialization */
    });
```

- **Consumer**

```js
import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({
  url: 'amqp://localhost',
  application: 'CONSUMER'
});
myEventManager.on('MY_EVENT_NAME', async (payload)=>{
    console.log(payload);
});
```

The handler function, by default will tell RabbitMQ to _"acknowledge"_ the message.

:warning: If you want to **flush** the message you can simply throw an exception ...

This will create the following elements in RabbitMQ :

- An Exchange of type **fanout** named : `MY_EVENT_NAME` **(the application name is not USED)**
- One Queue `CONSUMER::MY_EVENT_NAME` bound to the Exchange `MY_EVENT_NAME` **(the application name _IS_ USED)**

- **Producer**

```js
import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({ url: 'amqp://localhost', application: 'PRODUCER_1' });

myEventManager.emit('MY_EVENT_NAME', payload);
```

**Note:** Since version **1.1.0** the `emit` function return a promise that resolves with the payload effectively sent to RabbitMQ (ie: you can access the `_metas` informations).

This will create the following elements in RabbitMQ :

- An Exchange of type **fanout** named : `MY_EVENT_NAME` **(the application name is not USED)**

> NOTE: :warning: A very good convention will be to prefix the name of the __event name__ with the emitter application name, for example : `PRODUCER_1.MY_EVENT_NAME` but it's not mandatory.

If a new Consumer is created and listen the same event :

```js
import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({url:'amqp://localhost'}, appName:'OTHER_CONSUMER');
myEventManager.on('MY_EVENT_NAME', async (payload)=>{
    console.log(payload);
    return true;
});
```

It will add a queue `OTHER_CONSUMER::MY_EVENT_NAME` bound to the Exchange `MY_EVENT_NAME`.

## Emit And Wait

This feature has been introduced in version **1.1.0**, and allow you to emit an event and wait for a response (from another event, or from a generated event name).

```js
import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({ url: 'amqp://localhost', application: 'PRODUCER_1' });
const payload = { a: 42, b: 58 };
myEventManager.on('add', async eventPayload => {
  return { result: eventPayload.a + eventPayload.b };
});
const response = await myEventManager.emitAndWait('add', payload);

console.log(response.result); // 100
```

The above code, will generate a queue with a name : `add.RESPONSE.$$GUID$$` where guid is the value of `_metas.correlationId`.

This queue should be deleted after event is received, if something wrong happened, the message may be flushed to the Dead letter queue.

## Options

| Name                       | Type                  | Default                | Description                                                                                                                                                             |
| -------------------------- | --------------------- | ---------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| url                        | String                | `amqp://localhost`     | The connection URL of the RabbitMQ Server                                                                                                                               |
| application                | String                | n/a                    | The name of the application (used for naming exchanges and queues).                                                                                                     |
| metas                      | boolean or (function) | true                   | Weither or not to add `_metas` infirmations in the event, If a function this returned value, will become the `_metas` object (see <Metas Informations>)                 |
| alternateExchangeName      | String                | `DEAD_LETTER_EXCHANGE` | The name of the dead letter exchange you would like to use, (:warning: remember this must be the same value for producer and consumers)                                 |
| alternateQueueName         | String                | `DEAD_LETTER_QUEUE`    | The name of the dead letter queue (bound to the dead letter exhange) you would like to use, (:warning: remember this must be the same value for producer and consumers) |
| deadLetterExchangeName     | String                | `NO_QUEUE_EXCHANGE`    | The name of the alternate exchange you would like to use, (:warning: remember this must be the same value for producer and consumers)                                   |
| deadLetterQueueName        | String                | `QUEUE_NO_QUEUE`       | The name of the alternate exchange queue you would like to use, (:warning: remember this must be the same value for producer and consumers)                             |
| ttl                        | Number                | `86400000` (24h)       | The default TTL before flushing event to the Dead Letter Exchange                                                                                                       |
| maxNumberOfMessagesRetries | Numbner               | `100`                  | The number of tries the consumer will treat one specific message, before flushing it to the dead letter exhange.                                                        |
| logPrefix                  | string                | [RABBITMQ]             | The text that will be printed before the error log                                                                                                                      |
| logLevel                   | string                | error                  | The log Level [(see winston logLevels)](https://github.com/winstonjs/winston#logging-levels)                                                                            |
| logTransportMode           | string                | console                | Mute (no log), or output to console. Possible values are (_"console"_ or _"mute"_)                                                                                      |
| emitAndWaitTimeout         | number                | 30000 (30s)            | Define the maximum time to wait for a event                                                                                                                             |
| defaultResponseSuffix      | string                | `.RESPONSE`            | The suffix to add to response event name when waiting for a response                                                                                                    |

## Metas Informations

By defaut, some metas data are added to the payload :

- guid : A unique id generated, to be able to debug for example, or for following the event.
- timestamp : A number of milliseconds elapsed since January 1, 1970 00:00:00 UTC. (`Date.now()`)
- name : A string which is the name of the emitted event.
- applicationName: The value of the application which emits the Event.
- correlationId: _(optional)_ a unique ID (guid) to be used when waiting for a response when using `emitAndWait`
- replyTo: _(optional)_ the event to reply to when waiting for a response when using `emitAndWait`

So if your payload is :

```js
{
  userId: 42;
}
```

With Metas data it will be :

```js
{
    _metas:{
        guid: '465e008c-d37f-4e31-b494-023e6d187946',
        name: 'MY_EVENT_NAME',
        timestamp: 1519211809934,
        applicationName: 'PRODUCER_1'
    },
    userId:42
}
```

You can remove metas informations by settings the option value "metas" to false.

You can also override the metas generation by giving a function as _metas_ options value (on the emitter side only, as the event is generated there).

### With no metas

```js
import EventManager from 'rabbitmq-event-manager';
const myEventManagerWithNoMetas = new EventManager({
  url: 'amqp://localhost',
  appName: 'PRODUCER_1',
  metas: false,
});
const payload = { userId: 42 };
myEventManagerWithNoMetas.emit('MY_EVENT_NAME', payload);
// Payload will be
// {
//    userId:42
// }
```

### Override Metas

```js
import EventManager from 'rabbitmq-event-manager';
const myEventManagerOverrideMetas = new EventManager({
  url: 'amqp://localhost',
  appName: 'PRODUCER_1',
  metas: sourceMetas => {
    // sourceMetas contains the default metaa
    return {
      ...sourceMetas,
      otherProperty: 'MyValue',
    };
  },
});
const payload = { userId: 42 };
myEventManagerOverrideMetas.emit('MY_EVENT_NAME', payload);
// Payload will be
// {
//    _metas: {
//        guid : '465e008c-d37f-4e31-b494-023e6d187947'
//        name: 'MY_EVENT_NAME',
//        timestamp: 1519211809934,
//        otherProperty:'MyValue'
//    }
//    userId:42
// }
```

### Add metas per events

Since version **1.1.0** you can add (or override) the `_metas` property by setting it in the event paylaod :

```js
import EventManager from 'rabbitmq-event-manager';
const myEventManagerOverrideMetas = new EventManager({
  url: 'amqp://localhost',
  appName: 'PRODUCER_1',
});
const payload = { _metas: { newKey: 'value' }, userId: 42 };
myEventManagerOverrideMetas.emit('MY_EVENT_NAME', payload);
// Payload will be
// {
//    _metas: {
//        guid : '465e008c-d37f-4e31-b494-023e6d187947'
//        name: 'MY_EVENT_NAME',
//        timestamp: 1519211809934,
//        newKey:'value'
//    }
//    userId:42
// }
```

This will be added only for this `emit`.

## DEAD LETTER QUEUE

From the RabbitMQ documenation, the [Dead Letter Exchange](https://www.rabbitmq.com/dlx.html) is a RabbitMQ Exchange, that is attached to a queue. And message in that queue can be _"dead-lettered"_ if the queue reach its _length limit_, or, if the messages has _expired_ (TTL).

By default, The Exchange `DEAD_LETTER_EXCHANGE` (and its bound queue `DEAD_LETTER_QUEUE`) is automatically created and attached to all queues.

The names of the queue and the exchange can be changed by setting the options properties.

See <Acknowledge / N-Acknowledge> to see how to send an event to the _Dead Letter Exchange_

> :warning: Be carefull, if you change the names of the `DEAD_LETTER_EXCHANGE` and the `DEAD_LETTER_QUEUE`, remember that you will have to do it for all producers and all consumers, as they will all use the same RabbitMQ server.

## QUEUE NO QUEUE

When an _Event Exchange_ is created, an exchange `NO_QUEUE_EXCHANGE` is created and a queue named `QUEUE_NO_QUEUE` is created and bound to it.

When an event is emitted to the _Event Exchange_ if the exchange has no queue bounded to it, all the messages are routed to the `NO_QUEUE_EXCHANGE`

The names of the queue and the exchange can be changed by setting the options properties.

> :warning: Be carefull, if you change the names of the `DEAD_LETTER_EXCHANGE` and the `DEAD_LETTER_QUEUE`, remember that you will have to do it for all producers and all consumers, as they will all use the same RabbitMQ server.

## Acknowledge / N-Acknowledge

The `return` statement at the end of the handler function, will tell RabbitMQ to _"acknowledge"_ the message.

You can _"negatively acknowledge"_ and **Requeue** the message by returning `false` (rejecting the Promise).

If you don't want to **requeue** the message you can simply throw an exception ...

### Acknowledge the message

```js
import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({url:'amqp://localhost'}, appName:'OTHER_CONSUMER');
myEventManager.on('MY_EVENT_NAME', async (payload)=>{
    return {payload}; // the message will be acknowledge
    // or even
    // return;
    // or nothing
});
```

After the message is acknowledged, it will be removed from the queue and deleted.

### Flush the message

```js
import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({url:'amqp://localhost'}, appName:'OTHER_CONSUMER');
myEventManager.on('MY_EVENT_NAME', async (payload)=>{
    throw new Error('This will flush the message to DEAD LETTER QUEUE')
});
```

After the message is negatively acknowledged, it will be send to the Dead Letter Exhange, so in the queue DEAD_LETTER_QUEUE.

## NOTES :

### Should we integrate the application name in the event name

In a world of _events_ an event is fired, and some listeners will listen some events.
So with events sent by "Application", if we have an Application `UserAdminApp` which will send the event 'USER_CREATED', and we have an other application (WelcomeEmail) (or service) wanted to send on email to new users ...
So let's define that `WelcomeEmail` is listening USER_CREATED, it should knows that the event was fired by the `UserAdminApp`, but does we need to add the name of the application in event payload (\_metas), or in the event name.

On the `WelcomeEmail` side :

```ts
myEventManager.on('UserAdminApp.USER_CREATED', payload => {
  /* ... */
});
```

Or

```ts
myEventManager.on('USER_CREATED', payload => {
  /* 
    payload._metas.application = UserAdminApp
*/
});
```

If we consider **RabbitMQ** it means that the Exchange name will be `UserAdminApp.USER_CREATED` or `USER_CREATED`, so listening queues will be bound to the exchange.

Regarding this, I really think that the event should be `USER_CREATED` without any consideration of the application name, but as it is important to be able to know which application fires wich event, we may add the application name in the \_metas information of the event's payload;

### Alternate Exchange notes

- If the _"Alternate Exchange"_ was not created first it's not a problem, as it's configured, the only thing is that if one message is sent to the exchange 'My*EVENT', and the *"Alternate Exchange"\_ does not exists (if no queues are bound to the exchange 'My_EVENT'), the message will be lost !

- When we listen to an event :

```ts
import EventManager from 'rabbitmq-event-manager';
const myEventManager = new EventManager({url:'amqp://localhost'}, appName:'CONSUMER');
myEventManager.on('MY_EVENT_NAME', async (payload)=>{
    console.log(payload);
    return true;
});
```

It will automatically create an exchange of name `MY_EVENT_NAME`, and a Queue : `CONSUMER::MY_EVENT_NAME` bound to that exchange.

The Queue `CONSUMER::MY_EVENT_NAME` is configured with the _DEAD LETTER EXHANGE_, even if that exchange does not exists yet. It means that if a `MY_EVENT_NAME` is emmited, and the "listener" mark the event to be flush (dead lettered), the message will be lost (as no _DEAD LETTER EXHANGE_ is define, so ne queue are bound to it...).

:warning: In RabbitMQ only queue store messages, not exchangesso it's important that you initialize your rabbitMQ instance with the values of _alternateExchangeName_, _alternateQueueName_, _deadLetterExchangeName_, and _deadLetterQueueName_

### Requeue with delay

It could be very intersting to _"negatively acknowledge"_ a message and ask to be requeue after a delay, but this will be (if can) in version 2 !

:warning: Remember that the purpose of RabbitMQ is to deliver messages not to store them, so _"requeing with delay"_ should be done in a specific application.

### Delete a message

I recently wrote another npm package [rabbitmq-delete-message](https://www.npmjs.com/package/rabbitmq-delete-message) to be able to delete a message from a RabbitMQ Queue, this package aimed to delete some messages from the _DEAD_LETTER_QUEUE_ or the _QUEUE_NO_QUEUE_.

## Demos

You can find two dummies examples in the `/demo` folder, here is how to run them.

### Using Docker

If you have docker on your laptop, you can create a RabbitMQ instance running tthe following command : 

```
docker run -d --hostname my-rabbit --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
```

and then run (from root folder)

```
yarn demo:docker
```


### Using CloudAMQP

You need to have an account on the https://www.cloudamqp.com/ website and to have created an instance, ones it's done you can run the folowwing command (from root folder) : 

```
RABBITMQ_URL=amqp://user:password@bullfrog.rmq.cloudamqp.com/home yarn demo:cloudamqp
```