octoblu/meshblu-core-protocol-adapter-http-streaming

View on GitHub
src/controllers/messenger-controller.coffee

Summary

Maintainability
Test Coverage
_                 = require 'lodash'
debug             = require('debug')('meshblu-server-http:messenger-controller')
{Readable}        = require 'stream'
MeshbluAuthParser = require '../helpers/meshblu-auth-parser'

class MessengerController
  constructor: ({@jobManager, @jobToHttp, @messengerManagerFactory}) ->
    @authParser = new MeshbluAuthParser

  subscribeSelf: (req, res) =>
    auth = @authParser.parse req
    {types} = _.extend {}, req.query, req.body
    types ?= ['sent', 'received', 'broadcast']
    types.push 'config'
    types.push 'data'
    if _.isString types
      types = [types]
    @_subscribe {req, res, toUuid: auth.uuid, types}

  subscribe: (req, res) =>
    auth = @authParser.parse req
    {types} = _.extend {}, req.query, req.body
    types ?= ['sent', 'received', 'broadcast']
    types.push 'config'
    types.push 'data'
    if _.isString types
      types = [types]
    @_subscribe {req, res, toUuid: req.params.uuid, types}

  subscribeBroadcast: (req, res) =>
    auth = @authParser.parse req
    @_subscribe {req, res, toUuid: req.params.uuid, types: ['broadcast']}

  subscribeSent: (req, res) =>
    auth = @authParser.parse req
    @_subscribe {req, res, toUuid: req.params.uuid, types: ['sent']}

  subscribeReceived: (req, res) =>
    auth = @authParser.parse req
    @_subscribe {req, res, toUuid: req.params.uuid, types: ['received']}

  _subscribe: ({req, res, toUuid, types}) =>
    req.body ?= {}
    req.body.types = types
    {topics} = _.extend {}, req.query, req.body

    job = @jobToHttp.httpToJob jobType: 'GetAuthorizedSubscriptionTypes', request: req, toUuid: toUuid

    @jobManager.do job, (error, jobResponse) =>
      return res.sendError error if error?

      if jobResponse?.metadata?.code != 204
        return @jobToHttp.sendJobResponse {jobResponse, res}

      res.type 'application/json'
      res.set {
        'Connection': 'keep-alive'
      }

      res.set @jobToHttp.metadataToHeaders jobResponse.metadata
      messenger = @messengerManagerFactory.build()
      readStream = new Readable
      readStream._read = _.noop
      readStream.pipe res

      messenger.connect (error) =>
        return res.sendError error if error?
        data = JSON.parse jobResponse.rawData
        {types} = data

        _.each types, (type) =>
          messenger.subscribe {type, topics, uuid: toUuid}
          return # subscribe sometimes returns false

      messenger.on 'message', (channel, message) =>
        debug 'on message', JSON.stringify(message)
        readStream.push JSON.stringify(message) + '\n'

      messenger.on 'config', (channel, message) =>
        debug 'on config', JSON.stringify(message)
        readStream.push JSON.stringify(message) + '\n'

      messenger.on 'data', (channel, message) =>
        debug 'on data', JSON.stringify(message)
        readStream.push JSON.stringify(message) + '\n'

      messenger.on 'error', (error) =>
        messenger.close()

      res.on 'close', ->
        messenger.close()

module.exports = MessengerController