src/rabbitmq.js
/* @flow */
/* global Bluebird$Promise RabbitMQChannel RabbitMQConfirmChannel RabbitMQConnection SubscribeObject RabbitMQOptions QueueObject */
'use strict'
const amqplib = require('amqplib')
const defaults = require('101/defaults')
const getNamespace = require('continuation-local-storage').getNamespace
const hasKeypaths = require('101/has-keypaths')
const Immutable = require('immutable')
const isFunction = require('101/is-function')
const isObject = require('101/is-object')
const isString = require('101/is-string')
const joi = require('joi')
const monitor = require('monitor-dog')
const Promise = require('bluebird')
const uuid = require('uuid')
const logger = require('./logger')
const tasksSchema = joi.object({
name: joi.string().required(),
exclusive: joi.bool(),
durable: joi.bool(),
autoDelete: joi.bool(),
jobSchema: joi.object({
isJoi: joi.bool().valid(true)
}).unknown()
})
const eventsSchema = joi.object({
name: joi.string().required(),
internal: joi.bool(),
durable: joi.bool(),
autoDelete: joi.bool(),
alternateExchange: joi.bool(),
jobSchema: joi.object({
isJoi: joi.bool().valid(true)
}).unknown()
})
const optsSchema = joi.object({
name: joi.string(),
hostname: joi.string(),
port: joi.number(),
username: joi.string(),
password: joi.string(),
log: joi.object().type(logger.constructor, 'Bunyan Logger'),
channelOpts: joi.object(),
tasks: joi.array().items(tasksSchema),
events: joi.array().items(eventsSchema)
}).or('tasks', 'events').required()
/**
* RabbitMQ model. Can be used independently for publishing or other uses.
*
* @author Bryan Kendall
* @param {Object} [opts] RabbitMQ connection options.
* @param {Object} [opts.channel] RabbitMQ channel options.
* @param {Object} [opts.channel.prefetch] Set prefetch for each consumer in a
* channel.
* @param {String} [opts.hostname=localhost] Hostname for RabbitMQ. Can be set
* with environment variable RABBITMQ_HOSTNAME.
* @param {Number} [opts.port=5672] Port for RabbitMQ. Can be set with
* environment variable RABBITMQ_PORT.
* @param {String} [opts.username] Username for RabbitMQ. Can be set with
* environment variable RABBITMQ_USERNAME.
* @param {String} [opts.password] Username for Password. Can be set with
* environment variable RABBITMQ_PASSWORD.
*/
class RabbitMQ {
static AMQPLIB_QUEUE_DEFAULTS: Object;
static AMQPLIB_EXCHANGE_DEFAULTS: Object;
channel: RabbitMQChannel;
channelOpts: Object;
connection: RabbitMQConnection;
consuming: Map<string, string>;
events: Array<string|Object>;
hostname: string;
log: Object;
name: string;
password: string;
port: number;
publishChannel: RabbitMQConfirmChannel;
subscribed: Set<string>;
subscriptions: Map<string, Function>;
tasks: Array<string|Object>;
username: string;
constructor (opts: Object) {
this.name = opts.name || 'ponos'
this.hostname = opts.hostname || process.env.RABBITMQ_HOSTNAME || 'localhost'
this.port = opts.port || parseInt(process.env.RABBITMQ_PORT, 10) || 5672
this.username = opts.username || process.env.RABBITMQ_USERNAME
this.password = opts.password || process.env.RABBITMQ_PASSWORD
this.log = opts.log || logger.child({ module: 'ponos:rabbitmq' })
this.channelOpts = opts.channel || {}
if (!this.username || !this.password) {
this.log.warn(
'RabbitMQ username and password not found. See Ponos Server ' +
'constructor documentation.'
)
}
this.tasks = opts.tasks || []
this.tasks = this.tasks.map(RabbitMQ._formatJobs)
this.events = opts.events || []
this.events = this.events.map(RabbitMQ._formatJobs)
this.log.trace({ opts: opts }, 'RabbitMQ constructor')
joi.assert(this, optsSchema)
this._setCleanState()
}
/**
* formats events and tasks to consistent format.
* add TID validation if not already there
* @param {Object|String} item task/job item from map
* @return {Object} formated job type
*/
static _formatJobs (item: string|Object): Object {
if (typeof item === 'string') {
return { name: item }
}
if (item.jobSchema) {
item.jobSchema = item.jobSchema.concat(joi.object({ tid: joi.string() }))
}
return item
}
/**
* Connect to the RabbitMQ server.
*
* @return {Promise} Promise that resolves once connection is established.
*/
connect (): Bluebird$Promise<void> {
if (this._isPartlyConnected() || this._isConnected()) {
return Promise.reject(new Error('cannot call connect twice'))
}
let authString = ''
if (this.username && this.password) {
authString = `${this.username}:${this.password}@`
}
const url = `amqp://${authString}${this.hostname}:${this.port}`
this.log.info({ url: url }, 'connecting')
return Promise
.resolve(amqplib.connect(url, {}))
.catch((err) => {
this.log.fatal({ err: err }, 'an error occured while connecting')
throw err
})
.then((conn) => {
this.connection = conn
this.log.info('connected')
this.connection.on('error', this._connectionErrorHandler.bind(this))
this.log.info('creating channel')
return Promise.resolve(this.connection.createChannel())
.catch((err) => {
this.log.fatal({ err: err }, 'an error occured creating channel')
throw err
})
})
.then((channel) => {
if (this.channelOpts.prefetch) {
this.log.info('setting prefetch on channel')
return Promise.resolve(channel.prefetch(this.channelOpts.prefetch))
.return((channel))
}
return channel
})
.then((channel) => {
this.log.info('created channel')
this.channel = channel
this.channel.on('error', this._channelErrorHandler.bind(this))
this.log.info('creating publishing channel')
return Promise.resolve(this.connection.createConfirmChannel())
.catch((err) => {
this.log.fatal({ err: err }, 'errored creating confirm channel')
throw err
})
})
.then((channel) => {
this.log.info('created confirm channel')
this.publishChannel = channel
this.publishChannel.on('error', this._channelErrorHandler.bind(this))
})
.then(() => {
return this._assertQueuesAndExchanges()
})
}
/**
* Asserts all passed queues and exchanges on channel
* @return {Promise} Promise resolved when everything is asserted
*/
_assertQueuesAndExchanges (): Bluebird$Promise<void> {
return Promise.each(this.events, (event) => {
if (typeof event === 'string') {
return this._assertExchange(event, 'fanout')
}
return this._assertExchange(event.name, 'fanout', event)
})
.then(() => {
return Promise.each(this.tasks, (task) => {
if (typeof task === 'string') {
return this._assertQueue(`${this.name}.${task}`)
}
return this._assertQueue(`${this.name}.${task.name}`, task)
})
})
.return()
}
/**
* Takes an object representing a message and sends it to a queue.
*
* @deprecated
* @param {String} queue Queue to receive the message.
* @param {Object} content Content to send.
* @return {Promise} Promise resolved when message is sent to queue.
*/
publishToQueue (queue: string, content: Object): Bluebird$Promise<void> {
return Promise.try(() => {
this.log.warn({
method: 'publishToQueue',
queue
}, 'rabbitmq.publishToQueue is deprecated. use `publishTask`.')
return this.publishTask(queue, content)
})
.return()
}
/**
* Takes an object representing a message and sends it to an exchange using
* a provided routing key.
*
* Note: Providing an empty string as the routing key is functionally the same
* as sending the message directly to a named queue. The function
* {@link RabbitMQ#publishToQueue} is preferred in this case.
*
* @deprecated
* @param {String} queue Exchange to receive the message.
* @param {String} routingKey Routing Key for the exchange.
* @param {Object} content Content to send.
* @return {Promise} Promise resolved when message is sent to the exchange.
*/
publishToExchange (
exchange: string,
routingKey: string,
content: Object
): Bluebird$Promise<void> {
return Promise.try(() => {
this.log.warn({
method: 'publishToExchange',
exchange
}, 'rabbitmq.publishToExchange is deprecated. use `publishEvent`.')
return this.publishEvent(exchange, content)
})
.return()
}
/**
* Takes an object representing a message and sends it to a task queue.
* appends passed in name to tasks
* @param {String} queue Task queue to receive the message.
* @param {Object} content Job to send.
* @param {Object} opts extra options for message.
* @return {Promise} Promise resolved when message is sent to queue.
*/
publishTask (queue: string, content: Object, opts?: Object): Bluebird$Promise<void> {
return Promise.try(() => {
const queueName = `${this.name}.${queue}`
this._validatePublish(queue, content, 'tasks')
const payload = RabbitMQ.buildJobPayload(content)
const meta = RabbitMQ.buildJobMeta(this.name, opts)
this.log.info({ queue: queueName, job: content, jobMeta: meta }, 'Publishing task')
this._incMonitor('task', queueName)
return Promise.resolve(
this.publishChannel.sendToQueue(queueName, payload, meta)
)
})
}
/**
* Sends an object representing a message to an exchange for the specified
* event.
*
* @param {String} queue Exchange to receive the message.
* @param {Object} content Content to send.
* @param {Object} opts extra options for message.
* @return {Promise} Promise resolved when message is sent to the exchange.
*/
publishEvent (exchange: string, content: Object, opts?: Object): Bluebird$Promise<void> {
return Promise.try(() => {
this._validatePublish(exchange, content, 'events')
const payload = RabbitMQ.buildJobPayload(content)
const meta = RabbitMQ.buildJobMeta(this.name, opts)
this.log.info({ event: exchange, job: content, jobMeta: meta }, 'Publishing event')
// events do not need a routing key (so we send '')
this._incMonitor('event', exchange)
return Promise.resolve(
this.publishChannel.publish(exchange, '', payload, meta)
)
})
}
/**
* Helper function calling `monitor.increment`. Monitor won't be called if
* `WORKER_MONITOR_DISABLED` is set.
*
* @private
* @param {String} type either event or task
* @param {String} name name of the event or task
*/
_incMonitor (type: string, name: string): void {
if (process.env.WORKER_MONITOR_DISABLED) {
return
}
const tags = {
type: type,
app_id: this.name,
name: name
}
monitor.increment('ponos.publish', tags)
}
/**
* Asserts exchanges on the channel.
*
* @param {String} name Exchange Name
* @param {String} type Type of exchange [topic|fanout]
* @param {Object} opts extra options for exchange
* @return {Promise} Promise resolved when exchange is created.
* @resolves {QueueObject} asserted exchange
*/
_assertExchange (name: string, type: string, opts?: Object): Bluebird$Promise<void> {
return Promise.resolve(
this.channel.assertExchange(
name,
type,
defaults(opts, RabbitMQ.AMQPLIB_EXCHANGE_DEFAULTS)
)
)
}
/**
* Asserts queue on the channel.
*
* @param {String} name Queue Name
* @param {Object} opts extra options for queue
* @return {Promise} Promise resolved when queue is created.
* @resolves {QueueObject} asserted queue
*/
_assertQueue (name: string, opts?: Object): Bluebird$Promise<QueueObject> {
return Promise.resolve(
this.channel.assertQueue(
name,
defaults(opts, RabbitMQ.AMQPLIB_QUEUE_DEFAULTS)
)
)
}
/**
* Subscribe to a specific direct queue.
*
* @private
* @param {String} queue Queue name.
* @param {Function} handler Handler for jobs.
* @param {Object} [queueOptions] Options for the queue.
* @see RabbitMQ.AMQPLIB_QUEUE_DEFAULTS
* @return {Promise} Promise that is resolved once queue is subscribed.
*/
subscribeToQueue (
queue: string,
handler: Function,
queueOptions?: Object
): Bluebird$Promise<void> {
const queueName = `${this.name}.${queue}`
const log = this.log.child({
method: 'subscribeToQueue',
queue: queueName
})
log.info('subscribing to queue')
if (!this._isConnected()) {
return Promise.reject(new Error('you must .connect() before subscribing'))
}
if (!isFunction(handler)) {
log.error('handler must be a function')
return Promise.reject(
new Error(`handler for ${queueName} must be a function`)
)
}
return Promise.try(() => {
log.trace('binding to queue')
this.subscriptions = this.subscriptions.set(queueName, handler)
this.subscribed = this.subscribed.add(`queue:::${queueName}`)
})
}
/**
* Subcribe to fanout exchange.
*
* @private
* @param {String} exchange Name of fanout exchange.
* @param {Function} handler Handler for jobs.
* @param {Object} [rabbitMQOptions] Options for the queues and exchanges.
* @param {Object} [rabbitMQOptions.queueOptions] Options for the queue.
* @see RabbitMQ.AMQPLIB_QUEUE_DEFAULTS
* @param {Object} [rabbitMQOptions.exchangeOptions] Options for the exchange.
* @see RabbitMQ.AMQPLIB_EXCHANGE_DEFAULTS
* @return {Promise} Promise resolved once subscribed.
*/
subscribeToFanoutExchange (
exchange: string,
handler: Function,
rabbitMQOptions?: RabbitMQOptions
): Bluebird$Promise<void> {
const log = this.log.child({
method: 'subscribeToFanoutExchange',
exchange,
rabbitMQOptions
})
log.info('subscribing to exchange')
const opts = {
exchange: exchange,
type: 'fanout',
handler: handler,
queueOptions: {},
exchangeOptions: {}
}
if (rabbitMQOptions && rabbitMQOptions.queueOptions) {
opts.queueOptions = rabbitMQOptions.queueOptions
}
if (rabbitMQOptions && rabbitMQOptions.exchangeOptions) {
opts.exchangeOptions = rabbitMQOptions.exchangeOptions
}
return this._subscribeToExchange(opts)
}
/**
* Subscribe to topic exchange.
*
* @private
* @param {String} exchange Name of topic exchange.
* @param {String} routingKey Routing key for topic exchange.
* @param {Function} handler Handler for jobs.
* @param {Object} [rabbitMQOptions] Options for the queues and exchanges.
* @param {Object} [rabbitMQOptions.exchangeOptions] Options for the exchange.
* @see RabbitMQ.AMQPLIB_EXCHANGE_DEFAULTS
* @param {Object} [rabbitMQOptions.queueOptions] Options for the queue.
* @see RabbitMQ.AMQPLIB_QUEUE_DEFAULTS
* @return {Promise} Promise resolved once subscribed.
*/
subscribeToTopicExchange (
exchange: string,
routingKey: string,
handler: Function,
rabbitMQOptions?: RabbitMQOptions
): Bluebird$Promise<void> {
const log = this.log.child({
method: 'subscribeToTopicExchange',
exchange,
rabbitMQOptions
})
log.info('subscribing to exchange')
const opts = {
exchange: exchange,
type: 'topic',
routingKey: routingKey,
handler: handler,
queueOptions: {},
exchangeOptions: {}
}
if (rabbitMQOptions && rabbitMQOptions.queueOptions) {
opts.queueOptions = rabbitMQOptions.queueOptions
}
if (rabbitMQOptions && rabbitMQOptions.exchangeOptions) {
opts.exchangeOptions = rabbitMQOptions.exchangeOptions
}
return this._subscribeToExchange(opts)
}
/**
* Start consuming from subscribed queues.
*
* @private
* @return {Promise} Promise resolved when all queues consuming.
*/
consume (): Bluebird$Promise<void> {
const log = this.log.child({ method: 'consume' })
log.info('starting to consume')
if (!this._isConnected()) {
return Promise.reject(new Error('you must .connect() before consuming'))
}
const subscriptions = this.subscriptions
this.subscriptions = new Immutable.Map()
const channel = this.channel
return Promise.map(subscriptions.keySeq(), (queue) => {
const handler = subscriptions.get(queue)
log.info({ queue: queue }, 'consuming on queue')
// XXX(bryan): is this valid? should I not be checking _this_.consuming?
if (this.consuming.has(queue)) {
log.warn({ queue: queue }, 'already consuming queue')
return
}
function wrapper (msg) {
let job
const jobMeta = msg.properties || {}
try {
job = JSON.parse(msg.content)
} catch (err) {
// relatively safe stringifying - could be buffer, could be invalid
log.error({ job: '' + msg.content }, 'content not valid JSON')
return channel.ack(msg)
}
handler(job, jobMeta, () => {
channel.ack(msg)
})
}
return Promise.resolve(this.channel.consume(queue, wrapper))
.then((consumeInfo) => {
this.consuming = this.consuming.set(queue, consumeInfo.consumerTag)
})
})
.return()
}
/**
* Unsubscribe and stop consuming from all queues.
*
* @private
* @return {Promise} Promise resolved when all queues canceled.
*/
unsubscribe (): Bluebird$Promise<void> {
const consuming = this.consuming
return Promise.map(consuming.keySeq(), (queue) => {
const consumerTag = consuming.get(queue)
return Promise.resolve(this.channel.cancel(consumerTag))
.then(() => {
this.consuming = this.consuming.delete(queue)
})
})
.return()
}
/**
* Disconnect from RabbitMQ.
*
* @return {Promise} Promise resolved when disconnected from RabbitMQ.
*/
disconnect (): Bluebird$Promise<void> {
if (!this._isPartlyConnected()) {
return Promise.reject(new Error('not connected. cannot disconnect.'))
}
return Promise.resolve(this.publishChannel.waitForConfirms())
.then(() => (Promise.resolve(this.connection.close())))
.then(() => (this._setCleanState()))
}
// Private Methods
/**
* Helper method to re-set the state of the model to be 'clean'.
*
* @private
*/
_setCleanState (): void {
delete this.channel
delete this.connection
this.subscriptions = new Immutable.Map()
this.subscribed = new Immutable.Set()
this.consuming = new Immutable.Map()
}
/**
* Error handler for the RabbitMQ connection.
*
* @private
* @throws {Error}
* @param {object} err Error object from event.
*/
_connectionErrorHandler (err: Error) {
this.log.fatal({ err: err }, 'connection has caused an error')
throw err
}
/**
* Error handler for the RabbitMQ channel.
*
* @private
* @throws {Error}
* @param {object} err Error object from event.
*/
_channelErrorHandler (err: Error) {
this.log.fatal({ err: err }, 'channel has caused an error')
throw err
}
/**
* Check to see if model is connected.
*
* @private
* @return {Boolean} True if model is connected and channel is established.
*/
_isConnected (): boolean {
return !!(this._isPartlyConnected() && this.channel && this.publishChannel)
}
/**
* Check to see if model is _partially_ connected. This means that the
* connection was established, but the channel was not.
*
* @private
* @return {Boolean} True if connection is established.
*/
_isPartlyConnected (): boolean {
return !!(this.connection)
}
/**
* Helper function to consolidate logic for subscribing to queues. Stores
* information about what is subscribed and is responsible for asserting
* exchanges and queues into existance.
*
* @private
* @param {Object} opts Object describing the exchange connection.
* @param {String} opts.exchange Name of exchange.
* @param {String} opts.handler Handler of jobs.
* @param {String} opts.type Type of exchange: 'fanout' or 'topic'.
* @param {Object} [opts.exchangeOptions] Options for the exchange.
* @see RabbitMQ.AMQPLIB_EXCHANGE_DEFAULTS
* @param {Object} [opts.queueOptions] Options for the queue.
* @see RabbitMQ.AMQPLIB_QUEUE_DEFAULTS
* @param {String} [opts.routingKey] Routing key for a topic exchange.
* @return {Promise} Promise resolved when subcribed to exchange.
*/
_subscribeToExchange (opts: SubscribeObject): Bluebird$Promise<void> {
const log = this.log.child({
method: '_subscribeToExchange',
opts: opts
})
log.info('subscribing to exchange')
if (!this._isConnected()) {
return Promise.reject(new Error('must .connect() before subscribing'))
}
if (opts.type === 'topic' && !opts.routingKey) {
return Promise.reject(new Error('routingKey required for topic exchange'))
}
let subscribedKey = `${opts.type}:::${opts.exchange}`
if (opts.type === 'topic' && opts.routingKey) {
subscribedKey = `${subscribedKey}:::${opts.routingKey}`
}
if (this.subscribed.has(subscribedKey)) {
log.warn(`already subscribed to ${opts.type} exchange`)
return Promise.resolve()
}
log.trace('asserting queue for exchange')
let queueName = `${this.name}.${opts.exchange}`
if (opts.type === 'topic' && opts.routingKey) {
queueName = `${queueName}.${opts.routingKey}`
}
return this._assertQueue(queueName, opts.queueOptions)
.then((queueInfo) => {
const queue = queueInfo.queue
log.info({ queue: queue }, 'queue asserted')
log.info('binding queue')
if (!opts.routingKey) {
opts.routingKey = ''
}
return Promise
.resolve(
this.channel.bindQueue(queue, opts.exchange, opts.routingKey)
)
.return(queue)
})
.then((queue) => {
log.info('bound queue')
this.subscriptions = this.subscriptions.set(queue, opts.handler)
this.subscribed = this.subscribed.add(subscribedKey)
})
}
static getKeyFromClsNamespace (key) {
const ns = getNamespace('ponos')
return ns && ns.get(key)
}
static buildJobPayload (content: Object) {
if (!content.tid) {
const tid = RabbitMQ.getKeyFromClsNamespace('tid')
content.tid = tid || uuid()
}
const stringContent = JSON.stringify(content)
return new Buffer(stringContent)
}
static buildJobMeta (name, opts) {
const jobMeta = defaults({
appId: name,
timestamp: Date.now()
}, opts || {})
if (jobMeta.headers == null) {
jobMeta.headers = {}
}
jobMeta.headers.publisherWorkerName = RabbitMQ.getKeyFromClsNamespace('currentWorkerName')
return jobMeta
}
/**
* Validate publish params. Adds a TID to the job it does not already have
* one.
* @private
* @param {String} name Name of queue or exchange.
* @param {Object} content Content to send.
* @param {String} type type of job to validate (tasks|events).
* @throws {Error} Must be connected to RabbitMQ.
* @throws {Error} Name must be a non-empty string.
* @throws {Error} Object must be an Object.
* @throws {Error} Joi validation error if jobSchema is provided and job is invalid
*/
_validatePublish (name: string, content: Object, type: string): void {
if (!this._isConnected()) {
throw new Error('you must call .connect() before publishing')
}
// flowtype does not prevent users from using this function incorrectly.
if (!isString(name) || name === '') {
throw new Error('name must be a string')
}
if (!isObject(content)) {
throw new Error('content must be an object')
}
// $FlowIgnore: flow does not understand dynamic keys
const job = this[type].find(hasKeypaths({ name: name }))
if (!job) {
throw new Error(`${type}: "${name}" not defined in constructor`)
}
if (job.jobSchema) {
joi.assert(content, job.jobSchema)
}
}
}
/**
* Default options provided for asserted queues.
*
* Reference the [amqplib docs]{@link
* http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue}
* for more information.
*
* @typedef AMQPLIB_QUEUE_DEFAULTS
* @const {Object}
* @property {Boolean} autoDelete=false Delete queue when it has 0 consumers.
* @property {Boolean} durable=true Queue survives broker restarts.
* @property {Boolean} exclusive=false Scopes the queue to the connection.
*/
RabbitMQ.AMQPLIB_QUEUE_DEFAULTS = {
exclusive: false,
durable: true,
autoDelete: false
}
/**
* Default options provided for asserted exchanges.
*
* Reference the [amqplib docs]{@link
* http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange}
* for more information.
*
* @typedef AMQPLIB_EXCHANGE_DEFAULTS
* @const {Object}
* @property {Boolean} autoDelete=false Delete exchange when it has 0 bindings.
* @property {Boolean} durable=true Queue survives broker restarts.
* @property {Boolean} internal=false Messages cannot be published directly to
* the exchange.
*/
RabbitMQ.AMQPLIB_EXCHANGE_DEFAULTS = {
durable: true,
internal: false,
autoDelete: false
}
/**
* RabbitMQ model.
*
* @module ponos/lib/rabbitmq
* @see RabbitMQ
*/
module.exports = RabbitMQ