ManageIQ/manageiq-messaging

View on GitHub
lib/manageiq/messaging/stomp/common.rb

Summary

Maintainability
A
25 mins
Test Coverage
B
88%
module ManageIQ
  module Messaging
    module Stomp
      module Common
        require 'manageiq/messaging/common'
        include ManageIQ::Messaging::Common

        private

        def raw_publish(address, body, headers)
          publish(address, encode_body(headers, body), headers)
          logger.info("Published to address(#{address}), msg(#{payload_log(body.inspect)}), headers(#{headers.inspect})")
        end

        def queue_for_publish(options)
          affinity = options[:affinity] || 'none'
          address = "queue/#{options[:service]}.#{affinity}"

          headers = {:"destination-type" => 'ANYCAST', :persistent => true}
          headers.merge!(options[:headers].except(*internal_header_keys)) if options.key?(:headers)

          headers[:expires]            = options[:expires_on].to_i * 1000 if options[:expires_on]
          headers[:AMQ_SCHEDULED_TIME] = options[:deliver_on].to_i * 1000 if options[:deliver_on]
          headers[:priority]           = options[:priority] if options[:priority]
          headers[:_AMQ_GROUP_ID]      = options[:group_name] if options[:group_name]

          [address, headers]
        end

        def queue_for_subscribe(options)
          affinity = options[:affinity] || 'none'
          queue_name = "queue/#{options[:service]}.#{affinity}"

          headers = {:"subscription-type" => 'ANYCAST', :ack => 'client'}

          [queue_name, headers]
        end

        def topic_for_publish(options)
          address = "topic/#{options[:service]}"

          headers = {:"destination-type" => 'MULTICAST', :persistent => true}
          headers.merge!(options[:headers].except(*internal_header_keys)) if options.key?(:headers)

          headers[:expires]            = options[:expires_on].to_i * 1000 if options[:expires_on]
          headers[:AMQ_SCHEDULED_TIME] = options[:deliver_on].to_i * 1000 if options[:deliver_on]
          headers[:priority]           = options[:priority] if options[:priority]

          [address, headers]
        end

        def topic_for_subscribe(options)
          queue_name = "topic/#{options[:service]}"

          headers = {:"subscription-type" => 'MULTICAST', :ack => 'client'}
          headers[:"durable-subscription-name"] = options[:persist_ref] if options[:persist_ref]

          [queue_name, headers]
        end

        def send_response(service, correlation_ref, result)
          response_options = {
            :service  => "#{service}.response",
            :affinity => correlation_ref
          }
          address, response_headers = queue_for_publish(response_options)
          raw_publish(address, result || '', response_headers.merge(:correlation_id => correlation_ref))
        end

        def internal_header_keys
          [:"destination-type", :persistent, :expires, :AMQ_SCHEDULED_TIME, :priority, :_AMQ_GROUP_ID]
        end

        def receive_response(service, correlation_ref)
          response_options = {
            :service  => "#{service}.response",
            :affinity => correlation_ref
          }
          queue_name, response_headers = queue_for_subscribe(response_options)
          subscribe(queue_name, response_headers) do |msg|
            ack(msg)
            begin
              yield decode_body(msg.headers, msg.body)
            ensure
              unsubscribe(queue_name)
            end
          end
        end
      end
    end
  end
end