lib/eventq/eventq_aws/aws_eventq_client.rb

Summary

Maintainability
A
0 mins
Test Coverage
A
98%
module EventQ
  module Amazon
    # Implements a general interface to raise an event
    # EventQ::RabbitMq::EventQClient is the sister-class which does the same for RabbitMq
    class EventQClient

      def initialize(options)

        if options[:client] == nil
          raise ':client (QueueClient) must be specified.'.freeze
        end

        @client = options[:client]

        @serialization_manager = EventQ::SerializationProviders::Manager.new
        @signature_manager = EventQ::SignatureProviders::Manager.new

        # this hash is used to record known event types:
        # key = event_type / name
        # value = topic arn
        @known_event_types = {}
      end

      # Returns true if the event has already been registerd, or false
      # otherwise.
      #
      # @param [String] event_type
      # @param [String] region
      #
      # @return [Boolean]
      def registered?(event_type, region = nil)
        topic_key = "#{region}:#{event_type}"
        @known_event_types.key?(topic_key)
      end

      # Registers the event event_type and returns its topic arn.
      #
      # @param [String] event_type
      # @param [String] region
      #
      # @return [String]
      def register_event(event_type, region = nil)
        topic_key = "#{region}:#{event_type}"
        return @known_event_types[topic_key] if registered?(event_type, region)

        topic_arn = @client.sns_helper(region).create_topic_arn(event_type, region)
        @known_event_types[topic_key] = topic_arn
        topic_arn
      end

      def publish(topic:, event:, context: {}, region: nil)
        raise_event(topic, event, context, region)
      end

      def raise_event(event_type, event, context = {}, region = nil)
        topic_arn = register_event(event_type, region)

        with_prepared_message(event_type, event, context) do |message|
          response = @client.sns(region).publish(
            topic_arn: topic_arn,
            message: message,
            subject: event_type
          )

          EventQ.logger.debug do
            "[#{self.class} #raise_event] - Published to SNS with topic_arn: #{topic_arn} | event_type: #{event_type} | Message: #{message}"
          end

          response
        end
      end

      def raise_event_in_queue(event_type, event, queue, delay, context = {})
        queue_url = @client.sqs_helper.get_queue_url(queue)
        with_prepared_message(event_type, event, context) do |message|
          response = @client.sqs.send_message(
            queue_url: queue_url,
            message_body: sqs_message_body_for(message),
            delay_seconds: delay
          )

          EventQ.logger.debug do
            "[#{self.class} #raise_event_in_queue] - Raised event to SQS queue: #{queue_url} | event_type: #{event_type} | Message: #{message}"
          end

          response
        end
      end

      def new_message
        EventQ::QueueMessage.new
      end

      private

      def with_prepared_message(event_type, event, context)
        qm = new_message
        qm.content = event
        qm.type = event_type
        qm.context = context
        qm.content_type = event.class.to_s
        if event.respond_to? :Correlation
          qm.correlation_trace_id = event.Correlation['Trace']
          qm.Correlation = event.Correlation
        end

        if EventQ::Configuration.signature_secret != nil
          provider = @signature_manager.get_provider(EventQ::Configuration.signature_provider)
          qm.signature = provider.write(message: qm, secret: EventQ::Configuration.signature_secret)
        end

        message = serialized_message(qm)

        response = yield(message)

        EventQ.log(:debug, "[#{self.class}] - Raised event. Message: #{message} | Type: #{event_type}.")

        response.message_id
      end

      def serialized_message(queue_message)
        serialization_provider = @serialization_manager.get_provider(EventQ::Configuration.serialization_provider)

        serialization_provider.serialize(queue_message)
      end

      def sqs_message_body_for(payload_message)
        JSON.dump(EventQ::Amazon::QueueWorker::MESSAGE => payload_message)
      end
    end
  end
end