tjwebb/sails-rabbitmq

View on GitHub
lib/adapter.js

Summary

Maintainability
A
1 hr
Test Coverage
const rabbit = require('rabbit.js')
const _ = require('lodash')
import uuid from 'node-uuid'

import PersistenceHandler from './handlers/persistence'

/**
 * Implementation of the sails-rabbitmq Adapter
 */
const Adapter = {


  /**
   * Set the primary key datatype for the persistence datastore from config/rabbit.js.
   */
  pkFormat: _.get(global, [ 'sails', 'config', 'rabbitmq', 'pkFormat'], 'integer'),

  /**
   * Local connections store
   */
  connections: new Map(),

  /**
   * Adapter default configuration
   */
  defaults: {
    url: 'amqp://localhost',
    schema: false
  },

  /**
   * This method runs when a model is initially registered
   * at server-start-time.  This is the only required method.
   *
   * @param  {[type]}   connection [description]
   * @param  {[type]}   collection [description]
   * @param  {Function} cb         [description]
   * @return {[type]}              [description]
   */
  registerConnection (connection, collections, cb) {
    if (!connection.identity) return cb(new Error('Connection is missing an identity.'))
    if (this.connections.get(connection.identity)) return cb(new Error('Connection is already registered.'))

    let context = rabbit.createContext(connection.url)

    let config = {
      identity: connection.identity,
      context: context,
      models: new Map(_.pairs(collections)),
      userSockets: new Set(),
      sockets: {
        publish: new Map(),
        push: new Map()
      },
      persistence: connection.persistence
    }
    this.connections.set(connection.identity, config)

    context.on('error', err => {
      console.error(err)
    })

    context.once('ready', () => {
      this.setupConnection(connection, collections, config)
        .then(cb)
        .catch(cb)
    })
  },

  setupConnection (connection, collections, config) {

    return Promise
      .all(_.map(collections, model => {
        return this.setupConnectionSockets(connection, model, config)
      }))
      .then(() => {
        if (config.persistence) {
          return Promise.all(_.map(collections, model => {
            new PersistenceHandler(connection, model)
          }))
        }
      })
      .then(() => {
        return Promise.resolve()
      })
  },

  setupConnectionSockets (connection, model, config) {
    return Promise.all([

      this.getPublishSocket(connection.identity, model.identity)
        .then(pubSocket => {
          config.sockets.publish.set(model.identity, pubSocket)
        }),

      this.getPushSocket(connection.identity, model.identity, { name: 'persistence' })
        .then(pushSocket => {
          config.sockets.push.set(model.identity, pushSocket)
        })
    ])
  },

  /**
   * Fired when a model is unregistered, typically when the server
   * is killed. Useful for tearing-down remaining open connections,
   * etc.
   *
   * @param  {Function} cb [description]
   * @return {[type]}      [description]
   */
  teardown (conn, cb) {
    if (_.isFunction(conn)) {
      cb = conn
      conn = null
    }

    let connections = conn ? [ conn ].values() : this.connections.values()

    for (let c of connections) {
      for (let socket of c.sockets.publish.values()) socket.close()
      for (let socket of c.sockets.push.values()) socket.close()
      for (let socket of c.userSockets.values()) socket.close()

      this.connections.delete(c.identity)
    }
    cb()
  },

  /**
   * @override
   */
  create (connection, collection, values, cb) {
    this.update(connection, collection, { where: values }, values, cb)
  },

  /**
   * @override
   */
  update (connection, collection, criteria, values, cb) {
    let config = this.connections.get(connection)
    let pushSocket = this.getSocket(connection, collection, 'push')

    this.getSubscribeSocket(connection, collection, criteria)
      .then(subSocket => {
        let uniqueId = uuid.v4()

        values.rabbitmqPublishId = uniqueId
        subSocket.on('data', data => {
          let model = JSON.parse(data)
          let modelUniqueId = _.isArray(model) ? model[0].rabbitmqPublishId : model.rabbitmqPublishId
          if (modelUniqueId == uniqueId) {
              subSocket.close()
              delete model.rabbitmqPublishId
              cb(null, model)
          }
        })

        pushSocket.write(JSON.stringify(values), 'utf8')
      })
  },

  /**
   * Publish a message to an exchange. Accepts the same arguments as the
   * update() method in the "semantic" interface.
   *
   * Exchange configuration defined in the adapter's connection config object
   * determines how the message is handled/routed by the exchange.
   */
  publish (connection, collection, values) {
    let socket = this.getSocket(connection, collection, 'publish')
    let routingKey = this.getRoutingKey(connection, collection, values)

    socket.publish(routingKey, JSON.stringify(values), 'utf8')

    return Promise.resolve(socket)
  },

  /**
   * Setup and connect a PUBLISH socket for the specified model
   */
  getPublishSocket (connection, collection) {
    let config = this.connections.get(connection)
    let context = config.context
    let address = this.getExchangeName(collection)
    let socket = context.socket('PUBLISH', { routing: 'topic' })

    return new Promise((resolve, reject) => {
      socket.connect(address, () => {
        resolve(socket)
      })
    })
  },

  /**
   * Setup and connect a SUBSCRIBE socket for the specified model
   */
  getSubscribeSocket (connection, collection, options = { }) {
    let config = this.connections.get(connection)
    let context = config.context
    let address = this.getExchangeName(collection)
    let routingKey = options.routingKey || this.getRoutingKey(connection, collection, options.where)
    let socket = context.socket('SUBSCRIBE', { routing: 'topic' })

    socket.setEncoding('utf8')
    socket.once('close', () => {
      config.userSockets.delete(socket)
    })

    return new Promise((resolve, reject) => {
      socket.connect(address, routingKey, () => {
        config.userSockets.add(socket)
        resolve(socket)
      })
    })
  },

  /**
   * Setup and connect a PUSH socket for the specified model
   */
  getPushSocket (connection, collection, options = { }) {
    let config = this.connections.get(connection)
    let context = config.context
    let address = this.getQueueName(collection, options.name)
    let socket = context.socket('PUSH')

    return new Promise((resolve, reject) => {
      socket.connect(address, () => {
        resolve(socket)
      })
    })
  },

  /**
   * Setup and connect a WORKER socket for the specified model
   */
  getWorkerSocket (connection, collection, options = { }) {
    let config = this.connections.get(connection)
    let context = config.context
    let address = this.getQueueName(collection, options.name)
    let socket = context.socket('WORKER')

    socket.setEncoding('utf8')
    socket.once('close', () => {
      config.userSockets.delete(socket)
    })
    return new Promise((resolve, reject) => {
      socket.connect(address, () => {
        config.userSockets.add(socket)
        resolve(socket)
      })
    })
  },

  /**
   * Return an extant socket of the specific type for the specified model
   */
  getSocket (connectionId, collection, type) {
    let connection = this.connections.get(connectionId)
    return connection.sockets[type].get(collection)
  },

  /**
   * Return the name of the AMQP exchange that is used by the specified model
   */
  getExchangeName (model) {
    return `sails.models.${model}`
  },

  /**
   * Return the name of the AMQP queue that is used by the specified model
   * in conjuction with a particular type of work(er)
   */
  getQueueName (model, name) {
    if (_.isUndefined(name)) {
      throw new Error('name cannot be undefined in getQueueName')
    }
    return `sails.models.${model}.${name}`
  },

  /**
   * Return AMQP routing key for a given Model instance
   * @return dot-delimited string of model attributes which constitutes the
   *    queue routing key
   */
  getRoutingKey (connection, collection, values) {
    let config = this.connections.get(connection)
    let Model = config.models.get(collection)
    if (_.isUndefined(values)) {
      return '#'
    }
    else if (!_.isArray(Model.routingKey)) {
      throw new Error(
        `The model ${Model.identity} must define a routingKey
        in order to be used with the Waterline pubsub interface`
      )
    }
    else {
      return this.parseRoutingKey(Model.routingKey, values)
    }
  },

  /**
   * @return a rabbitmq routing key derived from a list of model attributes
   */
  parseRoutingKey (routingKey, values) {
    return routingKey.map(attribute => { return values[attribute] }).join('.')
  },

  /**
   * Return a model's connection that will be used for persistence, if it
   * exists.
   */
  getPersistenceConnection (connection, collection) {
    let config = this.connections.get(connection)
    let Model = config.models.get(collection)

    let persistenceConnections = _.without(Model.connection, connection)

    return persistenceConnections[0]
  }
}

_.bindAll(Adapter)

export default Adapter