micro-toolkit/event-bus-zeromq

View on GitHub
test/subscriber_test.js

Summary

Maintainability
F
1 wk
Test Coverage
var uuidGen = require('uuid')
var zmq = require('zeromq')
var Logger = require('../logger')
var toFrames = require('./support/frames_helper').toFrames
var toDataFrames = require('./support/frames_helper').toDataFrames
var logHelper = require('./support/log_helper')
var zmqHelper = require('./support/zmq_helper')
var _ = require('lodash')
var eventInstanceMemoryFactory = require('./support/memory_event_store')

describe('Subscriber Module', function () {
  var subStub, dealerStub, log, subscriber, config

  before(function () {
    // since log is obtain on module loading this to the trick
    log = logHelper.getLogStub()
    sinon.stub(Logger, 'getLogger').returns(log)

    // we first should stub the logger because is required on module
    subscriber = require('../lib/subscriber')
  })

  after(function () {
    Logger.getLogger.restore()
  })

  beforeEach(function () {
    dealerStub = zmqHelper.getSocketStub()
    subStub = zmqHelper.getSocketStub()
    var zmqStub = sinon.stub(zmq, 'socket')
    zmqStub.withArgs('dealer').returns(dealerStub)
    zmqStub.withArgs('sub').returns(subStub)

    var eventInstanceMemory = eventInstanceMemoryFactory.getInstance()
    config = { store: { instance: eventInstanceMemory } }
  })

  afterEach(function () {
    zmq.socket.restore()
    if (uuidGen.v4.restore) { uuidGen.v4.restore() }
  })

  describe('#getInstance', function () {
    // it('should obtain the logger micro.bus.publisher', function() {
    //   publisher.getInstance(config)
    //   Logger.getLogger.should.have.been.calledWith('micro.bus.publisher')
    // })

    describe('snapshot stream', function () {
      it('open a dealer 0MQ socket', function () {
        subscriber.getInstance(config)
        zmq.socket.should.have.been.calledWith('dealer')
      })

      it('set 0MQ socket identity with unique generated value', function () {
        sinon.stub(uuidGen, "v4").returns('uuid')
        subscriber.getInstance(config)
        dealerStub.identity.should.be.eq('uuid')
      })

      it('should handle socket messages', function () {
        subscriber.getInstance(config)
        dealerStub.on.should.have.been.calledWith('message', match.func)
      })
    })

    describe('subscriber stream', function () {
      it('should open a sub 0MQ socket', function () {
        subscriber.getInstance(config)
        zmq.socket.should.have.been.calledWith('sub')
      })

      it('should detach subscriber socket from event loop', function () {
        subscriber.getInstance(config)
        subStub.unref.should.have.been.called
      })

      it('should handle socket messages', function () {
        subscriber.getInstance(config)
        subStub.on.should.have.been.calledWith('message', match.func)
      })

      it('should set keep alive on the socket', function () {
        subscriber.getInstance(config)
        subStub.setsockopt.should.have.been.calledWith(zmq.ZMQ_TCP_KEEPALIVE, 1)
      })

    })
  })

  describe('.connect', function () {
    it('should log connect information', function () {
      var target = subscriber.getInstance(config)
      return target.connect().then(function () {
        log.info.should.have.been.calledWith(
          'Subscriber opened the folowing streams\n\tsnapshot: %s\n\tsubscriber: %s',
          'tcp://127.0.0.1:5556',
          'tcp://127.0.0.1:5557'
        )
      })
    })

    describe('open a snapshot stream', function () {
      it('should connect socket', function () {
        var target = subscriber.getInstance(config)
        return target.connect().then(function () {
          dealerStub.connect.should.have.been.called
        })
      })

      it('should connect socket to default configuration', function () {
        var target = subscriber.getInstance(config)
        return target.connect().then(function () {
          dealerStub.connect.should.have.been.calledWith('tcp://127.0.0.1:5556')
        })
      })

      it('should connect socket to subscriber config port - 1', function () {
        config.address = 'tcp://127.0.0.1:6667'
        var target = subscriber.getInstance(config)
        return target.connect().then(function () {
          dealerStub.connect.should.have.been.calledWith('tcp://127.0.0.1:6666')
        })
      })

      it('should connect socket to snapshot config', function () {
        config.snapshot = 'tcp://127.0.0.1:7777'
        var target = subscriber.getInstance(config)
        return target.connect().then(function () {
          dealerStub.connect.should.have.been.calledWith('tcp://127.0.0.1:7777')
        })
      })
    })

    describe('open a subscriber stream', function () {
      it('should connect socket', function () {
        var target = subscriber.getInstance(config)
        return target.connect().then(function () {
          subStub.connect.should.have.been.called
        })
      })

      it('should connect socket to default configuration', function () {
        var target = subscriber.getInstance(config)
        return target.connect().then(function () {
          subStub.connect.should.have.been.calledWith('tcp://127.0.0.1:5557')
        })
      })

      it('should connect socket to address configuration', function () {
        config.address = 'tcp://127.0.0.1:7777'
        var target = subscriber.getInstance(config)
        return target.connect().then(function () {
          subStub.connect.should.have.been.calledWith('tcp://127.0.0.1:7777')
        })
      })

      it('should subscribe to topics', function () {
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', _.noop)
        return target.connect().then(function () {
          subStub.subscribe.should.have.been.called
        })
      })

      it('should subscribe to topics registered', function () {
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', _.noop)
        target.on('/test/2/topic', _.noop)
        return target.connect().then(function () {
          subStub.subscribe.should.have.been.calledWith('/test/1/topic')
          subStub.subscribe.should.have.been.calledWith('/test/2/topic')
        })
      })

      it('should subscribe topic before connect to stream', function () {
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', _.noop)
        return target.connect().then(function () {
          subStub.subscribe.should.have.been.calledBefore(subStub.connect)
        })
      })
    })

    describe('do snapshot a sync', function () {
      it('should log that syncronization started', function () {
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', _.noop)
        log.info.resetHistory()
        return target.connect().then(function () {
          log.info.should.have.been.calledWith(
            match.any,
            'Started subscriber sync for snapshot=%s of topics %s',
            0, '/test/1/topic'
          )
        })
      })

      it('should send command with valid frame amount', function () {
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', _.noop)
        return target.connect().then(function () {
          dealerStub.send.should.have.been.calledWith(
            match.has('length', 3)
          )
        })
      })

      it('should send SYNCSTART command', function () {
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', _.noop)
        return target.connect().then(function () {
          dealerStub.send.should.have.been.calledWith(
            match.has('0', 'SYNCSTART')
          )
        })
      })

      it('should send SYNCSTART command containing the registered topics', function () {
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', _.noop)
        target.on('/test/1/topic2', _.noop)
        return target.connect().then(function () {
          dealerStub.send.should.have.been.calledWith(
            match.has('1', '/test/1/topic,/test/1/topic2')
          )
        })
      })

      it('should send SYNCSTART command containing last sequence', function () {
        config.store.instance.insert(99, 'raw-data')
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', _.noop)
        return target.connect().then(function () {
          dealerStub.send.should.have.been.calledWith(
            match.has('2', 99)
          )
        })
      })

      it('should send SYNCSTART command containing last sequence to zero when not available', function () {
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', _.noop)
        target.connect()
        return target.connect().then(function () {
          dealerStub.send.should.have.been.calledWith(
            match.has('2', 0)
          )
        })
      })

      it('should trigger snapshot events', function () {
        var handler

        var evtFrames = toDataFrames([
          'SYNC', '/test/1/topic', 1, 'producer',
          '2016-11-18T14:36:49.007Z', 'uuid', 'event-data'
        ])

        var spy = sinon.spy()
        dealerStub.on = function(msg, fn) { handler = fn }
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', spy)
        return target.connect().then(function () {
          return handler.apply(null, evtFrames)
        })
        .then(function () {
          spy.should.have.been.calledWith('event-data')
        })
      })

      it('should trigger snapshot events with partial topic match', function () {
        var handler

        var evtFrames = toDataFrames([
          'SYNC', '/test/1/topic', 1, 'producer',
          '2016-11-18T14:36:49.007Z', 'uuid', 'event-data'
        ])
        var spy = sinon.spy()
        dealerStub.on = function(msg, fn) { handler = fn }
        var target = subscriber.getInstance(config)
        target.on('/test/1', spy)
        return target.connect().then(function () {
          return handler.apply(null, evtFrames)
        })
        .then(function () {
          spy.should.have.been.calledWith('event-data')
        })
      })

      it('should not trigger snapshot events without topic match', function () {
        var handler

        var evtFrames = toFrames([
          'SYNC', '/test/1/topic', 1, 'producer',
          '2016-11-18T14:36:49.007Z', 'uuid', 'event-data'
        ])
        var spy = sinon.spy()
        dealerStub.on = function(msg, fn) { handler = fn }
        var target = subscriber.getInstance(config)
        target.on('/test/2', spy)
        return target.connect().then(function () {
          return handler.apply(null, evtFrames)
        })
        .then(function () {
          spy.should.not.have.been.called
        })
      })

      it('should log warning information about topic mismatch', function () {
        var handler
        var evtFrames = toDataFrames([
          'SYNC', '/test/1/topic', 1, 'producer',
          '2016-11-18T14:36:49.007Z', 'uuid', 'event-data'
        ])
        var spy = sinon.spy()
        dealerStub.on = function(msg, fn) { handler = fn }
        var target = subscriber.getInstance(config)
        target.on('/test/2', spy)
        return target.connect().then(function () {
          log.warn.resetHistory()
          return handler.apply(null, evtFrames)
        })
        .then(function () {
          log.warn.should.have.been.calledWith(
            'Received a event without topic match for topic: %s', '/test/1/topic'
          )
        })
      })

      it('should handle a SYNCEND command', function () {
        var handler
        var evtFrames = toFrames(['SYNCEND', '/test/1/topic', 1])
        var spy = sinon.spy()
        dealerStub.on = function(msg, fn) { handler = fn }
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', spy)
        return target.connect().then(function () {
          return handler.apply(null, evtFrames)
        })
        .then(function () {
          spy.should.not.have.been.called
        })
      })

      it('should log snapshot sync completion', function () {
        var handler
        var evtFrames = toFrames(['SYNCEND', '/test/1/topic', 1])
        dealerStub.on = function(msg, fn) { handler = fn }
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', _.noop)

        return target.connect().then(function () {
          log.info.resetHistory()
          return handler.apply(null, evtFrames)
        })
        .then(function () {
          log.info.should.have.been.calledWith(
            'Finished subscriber sync snapshot=%s for topics %s',
            1, '/test/1/topic'
          )
        })
      })

      it('should attach subscriber socket to event loop', function () {
        var handler
        var evtFrames = toFrames(['SYNCEND', '/test/1/topic', 1])
        dealerStub.on = function(msg, fn) { handler = fn }
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', _.noop)
        return target.connect().then(function () {
          return handler.apply(null, evtFrames)
        })
        .then(function () {
          subStub.ref.should.have.been.called
        })
      })

      it('should attach subscriber socket only after SYNCEND command', function () {
        var handler

        var evtFrames = toDataFrames([
          'SYNC', '/test/1/topic', 1, 'producer',
          '2016-11-18T14:36:49.007Z', 'uuid', 'event-data'
        ])

        dealerStub.on = function(msg, fn) { handler = fn }
        var target = subscriber.getInstance(config)
        target.on('/test/1/topic', _.noop)

        return target.connect().then(function () {
          return handler.apply(null, evtFrames)
        })
        .then(function () {
          subStub.ref.should.not.have.been.called
        })
      })
    })
  })

  describe('.on', function () {
    it('should register topic', function(){
      var target = subscriber.getInstance(config)
      target.on('/test/1/topic', _.noop)
    })

    it('should log information about topic registration', function() {
      var target = subscriber.getInstance(config)
      log.info.resetHistory()
      target.on('/test/1/topic', _.noop)
      log.info.should.have.been.calledWith(
        'Subscriber listens to the folowing topic: \'%s\'',
        '/test/1/topic'
      )
    })

    it('should receive events from subscribed topics', function () {
      var handler
      var evtFrames = toDataFrames([
        '/test/1/topic', 1, 'producer',
        '2016-11-18T14:36:49.007Z', 'uuid', 'event-data'
      ])

      var spy = sinon.spy()
      subStub.on = function(msg, fn) { handler = fn }
      var target = subscriber.getInstance(config)
      target.on('/test/1/topic', spy)
      return target.connect().then(function () {
        return handler.apply(null, evtFrames)
      })
      .then(function () {
        spy.should.have.been.calledWith('event-data')
      })
    })

    it('should receive events from subscribed topics with partial match', function () {
      var handler
      var evtFrames = toDataFrames([
        '/test/1/topic', 1, 'producer',
        '2016-11-18T14:36:49.007Z', 'uuid', 'event-data'
      ])


      var spy = sinon.spy()
      subStub.on = function(msg, fn) { handler = fn }
      var target = subscriber.getInstance(config)
      target.on('/test/1', spy)
      return target.connect().then(function () {
        return handler.apply(null, evtFrames)
      })
      .then(function () {
        spy.should.have.been.calledWith('event-data')
      })
    })

    it('should trigger events with sequence higher snapshot', function () {
      var handler, dealerHandler

      var syncEndFrames = toFrames(['SYNCEND', '/test/1/topic', 1])
      dealerStub.on = function(msg, fn) { dealerHandler = fn }

      var evtFrames = toDataFrames([
        '/test/1/topic', 2, 'producer',
        '2016-11-18T14:36:49.007Z', 'uuid', 'event-data'
      ])

      var spy = sinon.spy()
      subStub.on = function(msg, fn) { handler = fn }
      var target = subscriber.getInstance(config)
      target.on('/test/1', spy)
      return target.connect().then(function () {
        return dealerHandler.apply(null, syncEndFrames)
      }).then(function () {
        return handler.apply(null, evtFrames)
      })
      .then(function () {
        spy.should.have.been.calledWith('event-data')
      })
    })

    it('should not trigger events with sequence bellow snapshot', function () {
      var handler, dealerHandler

      var syncEndFrames = toFrames(['SYNCEND', '/test/1/topic', 1])
      dealerStub.on = function(msg, fn) { dealerHandler = fn }

      var evtFrames = toDataFrames([
        '/test/1/topic', 1, 'producer',
        '2016-11-18T14:36:49.007Z', 'uuid', 'event-data'
      ])

      var spy = sinon.spy()
      subStub.on = function(msg, fn) { handler = fn }
      var target = subscriber.getInstance(config)
      target.on('/test/1', spy)
      target.connect()

      return target.connect().then(function () {
        return dealerHandler.apply(null, syncEndFrames)
      }).then(function () {
        return handler.apply(null, evtFrames)
      })
      .then(function () {
        spy.should.not.have.been.called
      })
    })

    it('should receive events from subscribed topics to all registered handlers', function () {
      var handler

      var evtFrames = toDataFrames([
        '/test/1/topic', 1, 'producer',
        '2016-11-18T14:36:49.007Z', 'uuid', 'event-data'
      ])

      var firstSpy = sinon.spy()
      var secondSpy = sinon.spy()
      subStub.on = function(msg, fn) { handler = fn }
      var target = subscriber.getInstance(config)
      target.on('/test/1/topic', firstSpy)
      target.on('/test/1/topic', secondSpy)

      return target.connect().then(function () {
        return handler.apply(null, evtFrames)
      })
      .then(function () {
        firstSpy.should.have.been.calledWith('event-data')
        secondSpy.should.have.been.calledWith('event-data')
      })
    })

    it('should receive complex data properly decoded', function () {
      var handler
      var evtFrames = toDataFrames([
        '/test/1/topic', 1, 'producer',
        '2016-11-18T14:36:49.007Z', 'uuid', {mydata: {isParsed: ['properly']}}
      ])

      var spy = sinon.spy()
      subStub.on = function(msg, fn) { handler = fn }
      var target = subscriber.getInstance(config)
      target.on('/test/1/topic', spy)

      return target.connect().then(function () {
        return handler.apply(null, evtFrames)
      })
      .then(function () {
        spy.should.have.been.calledWith({mydata: {isParsed: ['properly']}})
      })
    })
  })

  describe('.close', function () {
    it('should close subscriber stream', function () {
      var target = subscriber.getInstance(config)
      target.connect()
      target.close()
      subStub.close.should.have.been.called
    })

    it('should close snapshot stream', function () {
      var target = subscriber.getInstance(config)
      target.connect()
      target.close()
      dealerStub.close.should.have.been.called
    })

    it('should log close information', function () {
      var target = subscriber.getInstance(config)
      target.connect()
      log.info.resetHistory()
      target.close()
      log.info.should.have.been.calledWith('Closed subscriber streams')
    })
  })
})