trailsjs/trailpack-pubsub

View on GitHub
lib/PubSub.js

Summary

Maintainability
A
0 mins
Test Coverage
'use strict'

const EventEmitter = require('events')
const _ = require('lodash')
const redis = require('redis')

module.exports = class PubSub extends EventEmitter {

  constructor (config) {
    super()
    this._config = config // eslint-disable-line
    this.connected = false
    this._pubClient = null
    this._subClient = null
  }

  /**
   * Create a new connection to Redis server
   * @returns {Promise}
   */
  createConnection (config) {
    return new Promise((resolve, reject) => {
      const connection = redis.createClient(config)
      connection.on('ready', () => resolve(connection))
      connection.on('error', err => reject(err))
    })
  }

  /**
   * Handler for any message
   * @param {String} channel
   * @param {String|Object} message
   */
  _onMessage (channel, message) {
    try {
      message = JSON.parse(message)
    }
    catch (e) {
      // ... ignoring
    }
    if (_.isFunction(this._config.handlers.onMessage)) {
      this._config.handlers.onMessage(channel, message)
    }
    // fire event
    this.emit(channel, message)
  }

  /**
   * Will subscribe on all events and channels
   */
  _subscribe () {
    if (!this.connected)
      throw new Error('No connection exist')

    this.subClient.on('message', this._onMessage.bind(this))

    if (_.isFunction(this._config.handlers.onError)) {
      this.subClient.on('error', this._config.handlers.onError)
    }
  }

  /**
   * Create new connections
   * @returns {Promise}
   */
  connect () {
    return Promise.all([
      this.createConnection(this._config.connection),
      this.createConnection(this._config.connection)
    ])
    .then((connections) => {
      this._pubClient = connections[0]
      this._subClient = connections[1]
      this.connected = true
    })
    .then(() => this._subscribe())
  }

  /**
   * Will close existing connection
   * @param {Client} client
   * @returns {Promise}
   */
  closeConnection (client) {
    return new Promise((resolve, reject) => {
      client.on('end', () => resolve())
      client.on('error', (err) => reject(err))
      client.unsubscribe()
      client.quit()
    })
  }

  /**
   * Destroy connections
   * @returns {Promise}
   */
  disconnect () {
    return Promise.all([
      this.closeConnection(this._pubClient),
      this.closeConnection(this._subClient)
    ])
    .then(() => {
      this.connected = false
    })
  }

  /**
   * Publish client
   * @return {Client?}
   */
  get pubClient () {
    return this._pubClient
  }

  /**
   * Sub client
   * @return {Client?}
   */
  get subClient () {
    return this._subClient
  }

  /**
   * Publish message to channel
   * @param {String} channel
   * @param {String|Object} data
   */
  publish (channel, data) {
    if (!channel)
      channel = this._config.defaultChannel

    if (_.isObject(data)) {
      try {
        data = JSON.stringify(data)
      }
      catch (e) {
        // ... ignore
      }
    }

    this.pubClient.publish(channel, data || '') //failsafe no need to send undefined
  }

  /**
   * Rewrite on to be able to subscribe to channel
   */
  on (channel, cb) {
    this.subClient.subscribe(channel)
    super.on(channel, cb)
  }
}