kapost/circuitry

View on GitHub
lib/circuitry/subscriber.rb

Summary

Maintainability
A
3 hrs
Test Coverage
require 'retries'
require 'timeout'
require 'circuitry/concerns/async'
require 'circuitry/services/sqs'
require 'circuitry/message'
require 'circuitry/queue'

module Circuitry
  class SubscribeError < StandardError; end

  class Subscriber
    include Concerns::Async
    include Services::SQS

    attr_reader :queue, :timeout, :wait_time, :batch_size, :lock

    DEFAULT_OPTIONS = {
      lock: true,
      async: false,
      timeout: 15,
      wait_time: 10,
      batch_size: 10
    }.freeze

    CONNECTION_ERRORS = [
      Aws::SQS::Errors::ServiceError
    ].freeze

    def initialize(options = {})
      options = DEFAULT_OPTIONS.merge(options)

      self.subscribed = false
      self.queue = Queue.find(Circuitry.subscriber_config.queue_name).url

      %i[lock async timeout wait_time batch_size].each do |sym|
        send(:"#{sym}=", options[sym])
      end

      trap_signals
    end

    def subscribe(&block)
      raise ArgumentError, 'block required' if block.nil?
      raise SubscribeError, 'AWS configuration is not set' unless can_subscribe?

      logger.info("Subscribing to queue: #{queue}")

      self.subscribed = true
      poll(&block)
      self.subscribed = false

      logger.info("Unsubscribed from queue: #{queue}")
    rescue *CONNECTION_ERRORS => e
      logger.error("Connection error to queue: #{queue}: #{e}")
      raise SubscribeError, e.message
    end

    def subscribed?
      subscribed
    end

    def self.async_strategies
      super - [:batch]
    end

    def self.default_async_strategy
      Circuitry.subscriber_config.async_strategy
    end

    protected

    attr_writer :queue, :timeout, :wait_time, :batch_size
    attr_accessor :subscribed

    def lock=(value)
      value = case value
              when true then Circuitry.subscriber_config.lock_strategy
              when false then Circuitry::Locks::NOOP.new
              when Circuitry::Locks::Base then value
              else raise ArgumentError, lock_value_error(value)
              end

      @lock = value
    end

    private

    def lock_value_error(value)
      opts = Circuitry::Locks::Base
      "Invalid value `#{value}`, must be one of `true`, `false`, or instance of `#{opts}`"
    end

    def trap_signals
      trap('SIGINT') do
        if subscribed?
          Thread.new { logger.info('Interrupt received, unsubscribing from queue...') }
          self.subscribed = false
        end
      end
    end

    def poll(&block)
      poller = Aws::SQS::QueuePoller.new(queue, client: sqs)

      poller.before_request do |_stats|
        throw :stop_polling unless subscribed?
      end

      poller.poll(max_number_of_messages: batch_size, wait_time_seconds: wait_time, skip_delete: true) do |messages|
        messages = [messages] unless messages.is_a?(Array)
        process_messages(Array(messages), &block)
        Circuitry.flush
      end
    end

    def process_messages(messages, &block)
      if async?
        process_messages_asynchronously(messages, &block)
      else
        process_messages_synchronously(messages, &block)
      end
    end

    def process_messages_asynchronously(messages, &block)
      messages.each { |message| process_asynchronously { process_message(message, &block) } }
    end

    def process_messages_synchronously(messages, &block)
      messages.each { |message| process_message(message, &block) }
    end

    def process_message(message, &block)
      message = Message.new(message)

      logger.debug("Processing message #{message.id}")

      handled = try_with_lock(message.id) do
        handle_message_with_middleware(message, &block)
      end

      logger.info("Ignoring duplicate message #{message.id}") unless handled
    rescue => e
      logger.error("Error processing message #{message.id}: #{e}")
      error_handler.call(e) if error_handler
    end

    def handle_message_with_middleware(message, &block)
      middleware.invoke(message.topic.name, message.body) do
        handle_message(message, &block)
        delete_message(message)
      end
    end

    def try_with_lock(id)
      if lock.soft_lock(id)
        begin
          yield
        rescue => e
          lock.unlock(id)
          raise e
        end

        lock.hard_lock(id)
        true
      else
        false
      end
    end

    # TODO: Don't use ruby timeout.
    # http://www.mikeperham.com/2015/05/08/timeout-rubys-most-dangerous-api/
    def handle_message(message, &block)
      Timeout.timeout(timeout) do
        block.call(message.body, message.topic.name)
      end
    rescue => e
      logger.error("Error handling message #{message.id}: #{e}")
      raise e
    end

    def delete_message(message)
      logger.debug("Removing message #{message.id} from queue")
      sqs.delete_message(queue_url: queue, receipt_handle: message.receipt_handle)
    end

    def logger
      Circuitry.subscriber_config.logger
    end

    def error_handler
      Circuitry.subscriber_config.error_handler
    end

    def can_subscribe?
      return true if Circuitry.subscriber_config.use_iam_profile

      Circuitry.subscriber_config.aws_options.values.all? do |value|
        !value.nil? && !value.empty?
      end
    end

    def middleware
      Circuitry.subscriber_config.middleware
    end
  end
end