lib/manageiq/messaging/stomp/common.rb
module ManageIQ
module Messaging
module Stomp
module Common
require 'manageiq/messaging/common'
include ManageIQ::Messaging::Common
private
def raw_publish(address, body, headers)
publish(address, encode_body(headers, body), headers)
logger.info("Published to address(#{address}), msg(#{payload_log(body.inspect)}), headers(#{headers.inspect})")
end
def queue_for_publish(options)
affinity = options[:affinity] || 'none'
address = "queue/#{options[:service]}.#{affinity}"
headers = {:"destination-type" => 'ANYCAST', :persistent => true}
headers.merge!(options[:headers].except(*internal_header_keys)) if options.key?(:headers)
headers[:expires] = options[:expires_on].to_i * 1000 if options[:expires_on]
headers[:AMQ_SCHEDULED_TIME] = options[:deliver_on].to_i * 1000 if options[:deliver_on]
headers[:priority] = options[:priority] if options[:priority]
headers[:_AMQ_GROUP_ID] = options[:group_name] if options[:group_name]
[address, headers]
end
def queue_for_subscribe(options)
affinity = options[:affinity] || 'none'
queue_name = "queue/#{options[:service]}.#{affinity}"
headers = {:"subscription-type" => 'ANYCAST', :ack => 'client'}
[queue_name, headers]
end
def topic_for_publish(options)
address = "topic/#{options[:service]}"
headers = {:"destination-type" => 'MULTICAST', :persistent => true}
headers.merge!(options[:headers].except(*internal_header_keys)) if options.key?(:headers)
headers[:expires] = options[:expires_on].to_i * 1000 if options[:expires_on]
headers[:AMQ_SCHEDULED_TIME] = options[:deliver_on].to_i * 1000 if options[:deliver_on]
headers[:priority] = options[:priority] if options[:priority]
[address, headers]
end
def topic_for_subscribe(options)
queue_name = "topic/#{options[:service]}"
headers = {:"subscription-type" => 'MULTICAST', :ack => 'client'}
headers[:"durable-subscription-name"] = options[:persist_ref] if options[:persist_ref]
[queue_name, headers]
end
def send_response(service, correlation_ref, result)
response_options = {
:service => "#{service}.response",
:affinity => correlation_ref
}
address, response_headers = queue_for_publish(response_options)
raw_publish(address, result || '', response_headers.merge(:correlation_id => correlation_ref))
end
def internal_header_keys
[:"destination-type", :persistent, :expires, :AMQ_SCHEDULED_TIME, :priority, :_AMQ_GROUP_ID]
end
def receive_response(service, correlation_ref)
response_options = {
:service => "#{service}.response",
:affinity => correlation_ref
}
queue_name, response_headers = queue_for_subscribe(response_options)
subscribe(queue_name, response_headers) do |msg|
ack(msg)
begin
yield decode_body(msg.headers, msg.body)
ensure
unsubscribe(queue_name)
end
end
end
end
end
end
end