kapost/circuitry

View on GitHub
lib/circuitry/publisher.rb

Summary

Maintainability
A
25 mins
Test Coverage
require 'json'
require 'timeout'
require 'circuitry/concerns/async'
require 'circuitry/services/sns'

module Circuitry
  class PublishError < StandardError; end

  class SnsPublishError < StandardError
    def initialize(topic:, message:, exception:)
      msg = {
        error: "#{exception.class}: #{exception.message}",
        topic_arn: topic.arn,
        message: message
      }
      super msg.to_json
    end
  end

  class Publisher
    include Concerns::Async
    include Services::SNS

    DEFAULT_OPTIONS = {
      async: false,
      timeout: 15
    }.freeze

    CONNECTION_ERRORS = [
      ::Seahorse::Client::NetworkingError,
      ::Aws::SNS::Errors::InternalFailure
    ].freeze

    attr_reader :timeout

    def initialize(options = {})
      options = DEFAULT_OPTIONS.merge(options)

      self.async = options[:async]
      self.timeout = options[:timeout]
    end

    def publish(topic_name, object)
      raise ArgumentError, 'topic_name cannot be nil' if topic_name.nil?
      raise ArgumentError, 'object cannot be nil' if object.nil?
      raise PublishError, 'AWS configuration is not set' unless can_publish?

      message = object.to_json

      if async?
        process_asynchronously { publish_message(topic_name, message) }
      else
        publish_message(topic_name, message)
      end
    end

    def self.default_async_strategy
      Circuitry.publisher_config.async_strategy
    end

    protected

    def publish_message(topic_name, message)
      middleware.invoke(topic_name, message) do
        # TODO: Don't use ruby timeout.
        # http://www.mikeperham.com/2015/05/08/timeout-rubys-most-dangerous-api/
        Timeout.timeout(timeout) do
          logger.debug("Publishing message to #{topic_name}")

          handler = ->(error, attempt_number, _total_delay) do
            logger.warn("Error publishing attempt ##{attempt_number}: #{error.class} (#{error.message}); retrying...")
          end

          with_retries(max_tries: 3, handler: handler, rescue: CONNECTION_ERRORS, base_sleep_seconds: 0.05, max_sleep_seconds: 0.25) do
            topic = Topic.find(topic_name)
            sns_publish(topic: topic, message: message)
          end
        end
      end
    end

    attr_writer :timeout

    private

    def sns_publish(topic:, message:)
      sns.publish(topic_arn: topic.arn, message: message)

    rescue Aws::SNS::Errors::InvalidParameter => ex
      raise SnsPublishError.new(topic: topic, message:  message, exception: ex)
    end

    def logger
      Circuitry.publisher_config.logger
    end

    def can_publish?
      return true if Circuitry.publisher_config.use_iam_profile

      Circuitry.publisher_config.aws_options.values.all? do |value|
        !value.nil? && !value.empty?
      end
    end

    def middleware
      Circuitry.publisher_config.middleware
    end
  end
end