craigisrael/daemon_objects

View on GitHub
lib/daemon_objects/amqp/runner.rb

Summary

Maintainability
A
35 mins
Test Coverage
module DaemonObjects::Amqp::Runner
  attr_accessor :endpoint, :queue, :prefetch, :worker_class,
    :retry_wait_time

  def arguments
    @arguments ||= {}
  end

  def retry_wait_time
    @retry_wait_time || 5
  end

  def run
    logger.info "Preparing to start the AMQP watcher."

    connection = Bunny.new(endpoint) 
    connection.start

    Signal.trap("INT") do
      logger.info "Received signal 'INT'.  Exiting process"
      connection.close { EventMachine.stop } 
      exit
    end

    logger.info "Starting up the AMQP watcher."

    channel  = connection.create_channel

    # handle legacy boolean behavior
    self.prefetch = 1 if prefetch == true

    channel.prefetch(prefetch) if prefetch

    worker   = worker_class.new(
      channel, 
      get_consumer, 
      {
        :queue_name => queue,
        :logger     => logger,
        :arguments  => arguments
      })
    worker.start

    logger.info "AMQP worker started"

  rescue Bunny::InternalError, Bunny::TCPConnectionFailed => e
    logger.error(e) && e.backtrace.join("\n")
    wait && retry
  end

  def wait
    retry_message = "* Retrying connection in #{retry_wait_time} seconds .... *"
    sleep(retry_wait_time)
    logger.info("\n")
    logger.info("*" * retry_message.length)
    logger.info(retry_message)
    logger.info("*" * retry_message.length)
    logger.info("\n")
  end
end