
View on GitHub


1 hr
Test Coverage
const rabbit = require('rabbit.js')
const _ = require('lodash')
import uuid from 'node-uuid'

import PersistenceHandler from './handlers/persistence'

 * Implementation of the sails-rabbitmq Adapter
const Adapter = {

   * Set the primary key datatype for the persistence datastore from config/rabbit.js.
  pkFormat: _.get(global, [ 'sails', 'config', 'rabbitmq', 'pkFormat'], 'integer'),

   * Local connections store
  connections: new Map(),

   * Adapter default configuration
  defaults: {
    url: 'amqp://localhost',
    schema: false

   * This method runs when a model is initially registered
   * at server-start-time.  This is the only required method.
   * @param  {[type]}   connection [description]
   * @param  {[type]}   collection [description]
   * @param  {Function} cb         [description]
   * @return {[type]}              [description]
  registerConnection (connection, collections, cb) {
    if (!connection.identity) return cb(new Error('Connection is missing an identity.'))
    if (this.connections.get(connection.identity)) return cb(new Error('Connection is already registered.'))

    let context = rabbit.createContext(connection.url)

    let config = {
      identity: connection.identity,
      context: context,
      models: new Map(_.pairs(collections)),
      userSockets: new Set(),
      sockets: {
        publish: new Map(),
        push: new Map()
      persistence: connection.persistence
    this.connections.set(connection.identity, config)

    context.on('error', err => {

    context.once('ready', () => {
      this.setupConnection(connection, collections, config)

  setupConnection (connection, collections, config) {

    return Promise
      .all(, model => {
        return this.setupConnectionSockets(connection, model, config)
      .then(() => {
        if (config.persistence) {
          return Promise.all(, model => {
            new PersistenceHandler(connection, model)
      .then(() => {
        return Promise.resolve()

  setupConnectionSockets (connection, model, config) {
    return Promise.all([

      this.getPublishSocket(connection.identity, model.identity)
        .then(pubSocket => {
          config.sockets.publish.set(model.identity, pubSocket)

      this.getPushSocket(connection.identity, model.identity, { name: 'persistence' })
        .then(pushSocket => {
          config.sockets.push.set(model.identity, pushSocket)

   * Fired when a model is unregistered, typically when the server
   * is killed. Useful for tearing-down remaining open connections,
   * etc.
   * @param  {Function} cb [description]
   * @return {[type]}      [description]
  teardown (conn, cb) {
    if (_.isFunction(conn)) {
      cb = conn
      conn = null

    let connections = conn ? [ conn ].values() : this.connections.values()

    for (let c of connections) {
      for (let socket of c.sockets.publish.values()) socket.close()
      for (let socket of c.sockets.push.values()) socket.close()
      for (let socket of c.userSockets.values()) socket.close()


   * @override
  create (connection, collection, values, cb) {
    this.update(connection, collection, { where: values }, values, cb)

   * @override
  update (connection, collection, criteria, values, cb) {
    let config = this.connections.get(connection)
    let pushSocket = this.getSocket(connection, collection, 'push')

    this.getSubscribeSocket(connection, collection, criteria)
      .then(subSocket => {
        let uniqueId = uuid.v4()

        values.rabbitmqPublishId = uniqueId
        subSocket.on('data', data => {
          let model = JSON.parse(data)
          let modelUniqueId = _.isArray(model) ? model[0].rabbitmqPublishId : model.rabbitmqPublishId
          if (modelUniqueId == uniqueId) {
              delete model.rabbitmqPublishId
              cb(null, model)

        pushSocket.write(JSON.stringify(values), 'utf8')

   * Publish a message to an exchange. Accepts the same arguments as the
   * update() method in the "semantic" interface.
   * Exchange configuration defined in the adapter's connection config object
   * determines how the message is handled/routed by the exchange.
  publish (connection, collection, values) {
    let socket = this.getSocket(connection, collection, 'publish')
    let routingKey = this.getRoutingKey(connection, collection, values)

    socket.publish(routingKey, JSON.stringify(values), 'utf8')

    return Promise.resolve(socket)

   * Setup and connect a PUBLISH socket for the specified model
  getPublishSocket (connection, collection) {
    let config = this.connections.get(connection)
    let context = config.context
    let address = this.getExchangeName(collection)
    let socket = context.socket('PUBLISH', { routing: 'topic' })

    return new Promise((resolve, reject) => {
      socket.connect(address, () => {

   * Setup and connect a SUBSCRIBE socket for the specified model
  getSubscribeSocket (connection, collection, options = { }) {
    let config = this.connections.get(connection)
    let context = config.context
    let address = this.getExchangeName(collection)
    let routingKey = options.routingKey || this.getRoutingKey(connection, collection, options.where)
    let socket = context.socket('SUBSCRIBE', { routing: 'topic' })

    socket.once('close', () => {

    return new Promise((resolve, reject) => {
      socket.connect(address, routingKey, () => {

   * Setup and connect a PUSH socket for the specified model
  getPushSocket (connection, collection, options = { }) {
    let config = this.connections.get(connection)
    let context = config.context
    let address = this.getQueueName(collection,
    let socket = context.socket('PUSH')

    return new Promise((resolve, reject) => {
      socket.connect(address, () => {

   * Setup and connect a WORKER socket for the specified model
  getWorkerSocket (connection, collection, options = { }) {
    let config = this.connections.get(connection)
    let context = config.context
    let address = this.getQueueName(collection,
    let socket = context.socket('WORKER')

    socket.once('close', () => {
    return new Promise((resolve, reject) => {
      socket.connect(address, () => {

   * Return an extant socket of the specific type for the specified model
  getSocket (connectionId, collection, type) {
    let connection = this.connections.get(connectionId)
    return connection.sockets[type].get(collection)

   * Return the name of the AMQP exchange that is used by the specified model
  getExchangeName (model) {
    return `sails.models.${model}`

   * Return the name of the AMQP queue that is used by the specified model
   * in conjuction with a particular type of work(er)
  getQueueName (model, name) {
    if (_.isUndefined(name)) {
      throw new Error('name cannot be undefined in getQueueName')
    return `sails.models.${model}.${name}`

   * Return AMQP routing key for a given Model instance
   * @return dot-delimited string of model attributes which constitutes the
   *    queue routing key
  getRoutingKey (connection, collection, values) {
    let config = this.connections.get(connection)
    let Model = config.models.get(collection)
    if (_.isUndefined(values)) {
      return '#'
    else if (!_.isArray(Model.routingKey)) {
      throw new Error(
        `The model ${Model.identity} must define a routingKey
        in order to be used with the Waterline pubsub interface`
    else {
      return this.parseRoutingKey(Model.routingKey, values)

   * @return a rabbitmq routing key derived from a list of model attributes
  parseRoutingKey (routingKey, values) {
    return => { return values[attribute] }).join('.')

   * Return a model's connection that will be used for persistence, if it
   * exists.
  getPersistenceConnection (connection, collection) {
    let config = this.connections.get(connection)
    let Model = config.models.get(collection)

    let persistenceConnections = _.without(Model.connection, connection)

    return persistenceConnections[0]


export default Adapter