hicknhack-software/rails-disco

View on GitHub
active_event/lib/active_event/event_server.rb

Summary

Maintainability
A
0 mins
Test Coverage
require 'singleton'
require 'bunny'
module ActiveEvent
  class EventServer
    include Singleton

    def self.publish(event)
      type = event.class.name
      body = event.to_json
      instance.event_exchange.publish body, type: type, headers: event.store_infos
      LOGGER.debug "Published #{type} with #{body}"
    end

    def self.start(options)
      instance.options = options
      instance.start
    end

    def start
      event_connection.start
      listen_for_resend_requests
    rescue => e
      LOGGER.error e.message
      LOGGER.error e.backtrace.join("\n")
      raise e
    end

    def resend_events_after(id)
      if @replay_server_thread.nil? || !@replay_server_thread.alive?
        @replay_server_thread = Thread.new do
          Thread.current.priority = -10
          ReplayServer.start options, id
        end
      else
        ReplayServer.update id
      end
    end

    def listen_for_resend_requests
      resend_request_queue.subscribe do |_delivery_info, _properties, id|
        resend_request_received id
      end
    end

    def resend_request_received(id)
      LOGGER.debug "received resend request with id #{id}"
      resend_events_after id.to_i
    end

    def event_connection
      @event_server ||= Bunny.new URI::Generic.build(options[:event_connection]).to_s
    end

    def event_channel
      @event_channel ||= event_connection.create_channel
    end

    def event_exchange
      @event_exchange ||= event_channel.fanout options[:event_exchange]
    end

    def resend_request_exchange
      @resend_request_exchange ||= event_channel.direct "resend_request_#{options[:event_exchange]}"
    end

    def resend_request_queue
      @resend_request_queue ||= event_channel.queue('', auto_delete: true).bind(resend_request_exchange, routing_key: 'resend_request')
    end

    attr_accessor :options
  end
end