universoimpulso/atena

View on GitHub
src/services/amqp/index.js

Summary

Maintainability
C
1 day
Test Coverage
import { Connection } from 'amqplib-as-promised'
import chalk from 'chalk'
import { sendError } from 'log-on-slack'

import { handle as handleEnlistmentPayload } from './enlistment'
import { handle as handleImpulserAppPayload } from './impulserApp'
import { handle as handleMinerPayload } from './miner'

const { AMQP_URL: amqpUrl, ATENA_EXCHANGE: atenaExchange } = process.env

let channel

export const connect = async () => {
  try {
    const connection = new Connection(amqpUrl)
    await connection.init()
    channel = await connection.createChannel()
    channel.prefetch(1)
    await channel.assertExchange(atenaExchange, 'fanout', { durable: false })
    console.log(
      '%s [*] Ready to publish messages on',
      chalk.green('✓'),
      atenaExchange
    )

    await connectToMiner()
    await connectToEnlistment()
    await connectToImpulserApp()

    return channel
  } catch (error) {
    sendError({
      file: 'services/amqp/index.js - connect',
      error
    })
    process.exit(1)
  }
}

const connectToEnlistment = async () => {
  const {
    ENLISTMENT_EXCHANGE: enlistmentExchange,
    QUEUE_FOR_ENLISTMENT: queueForEnlistment
  } = process.env

  await channel.assertExchange(enlistmentExchange, 'fanout', { durable: false })

  await channel.assertQueue(queueForEnlistment, { durable: true })

  await channel.bindQueue(queueForEnlistment, enlistmentExchange)

  channel.consume(queueForEnlistment, message =>
    handleEnlistmentPayload({ message, channel })
  )

  console.log(
    '%s [*] Awaiting messages on',
    chalk.green('✓'),
    queueForEnlistment
  )
}

const connectToImpulserApp = async () => {
  const {
    IMPULSER_APP_EXCHANGE: impulserAppExchange,
    QUEUE_FOR_IMPULSER_APP: queueForImpulserApp
  } = process.env

  await channel.assertExchange(impulserAppExchange, 'fanout', {
    durable: false
  })

  await channel.assertQueue(queueForImpulserApp, { durable: true })

  await channel.bindQueue(queueForImpulserApp, impulserAppExchange)

  channel.consume(queueForImpulserApp, message =>
    handleImpulserAppPayload({ message, channel })
  )
  console.log(
    '%s [*] Awaiting messages on',
    chalk.green('✓'),
    queueForImpulserApp
  )
}

const connectToMiner = async () => {
  try {
    const {
      MINER_EXCHANGE: minerExchange,
      QUEUE_FOR_MINER: queueForMiner
    } = process.env

    await channel.assertExchange(minerExchange, 'fanout', {
      durable: false
    })

    await channel.assertQueue(queueForMiner, { durable: true })

    await channel.bindQueue(queueForMiner, minerExchange)

    channel.consume(queueForMiner, message =>
      handleMinerPayload({ message, channel })
    )
    console.log('%s [*] Awaiting messages on', chalk.green('✓'), queueForMiner)
  } catch (error) {
    console.log(error)
  }
}

export const publishToEnlistment = async payload => {
  const queueOpts = { persistent: false, type: payload.type }
  const message = Buffer.from(JSON.stringify(payload))
  try {
    await channel.publish(atenaExchange, '', message, queueOpts)
  } catch (error) {
    sendError({
      file: 'services/amqp/index.js - publishToEnlistment',
      payload,
      error
    })
  }
}