digital-science/baton

View on GitHub
lib/baton/consumer_manager.rb

Summary

Maintainability
A
0 mins
Test Coverage
require "baton/logging"

module Baton
  class ConsumerManager
    include Baton::Logging
    attr_accessor :consumer, :channel, :exchange_in, :exchange_out

    # Public: Initialize a ConsumerManager and adds itself as an observer to the consumer.
    #
    # consumer - An instance of Baton::Consumer
    # channel - An AMQP channel
    # exchange_in - An input exchange
    # exchange_out - An output exchange
    def initialize(consumer, channel, exchange_in, exchange_out)
      @consumer, @channel, @exchange_in, @exchange_out = consumer, channel, exchange_in, exchange_out
      @consumer.add_observer(self)
      @consumer.consumer_manager = self
    end

    # Public: Creates a queue and binds it to the input exchange based on the consumer's
    # routing key. Also adds handle_message as a callback method to queue.subscribe().
    #
    # Returns nothing.
    def start
      queue = channel.queue("", :exclusive => true, :auto_delete => true)
      queue.bind(exchange_in, :routing_key => consumer.routing_key)
      queue.subscribe(&method(:handle_message))
      logger.info "Bind queue with routing key '#{consumer.routing_key}' to exchange '#{exchange_in.name}', waiting for messages..."
    end

    # Public: Triggered whenever a message is received and forwards the message
    # to the consumer's handle_message.
    #
    # metadata - A metadata structure such as OpenStruct
    # payload - A JSON message
    #
    # Examples
    #
    #   handle_message(metadata, "{\"message\":\"a message\",\"type\":\"a type\"}")
    #
    # Returns nothing.
    def handle_message(metadata, payload)
      logger.debug "Received #{payload}, content_type = #{metadata.content_type}"
      consumer.handle_message(payload)
    end

    # Public: Method that is triggered when a consumer notifies with a message. It
    # logs the messages and writes them to the output exchange as json.
    #
    # message - A general message (Hash, String, etc)
    #
    # Examples
    #
    #   update("A message")
    #
    # Returns nothing.
    def update(message)
      case message.fetch(:type){""}
      when "error"
        logger.error message
      else
        logger.info message
      end
      exchange_out.publish(message.to_json)
    end
  end
end