lib/manageiq/messaging/stomp/queue.rb
module ManageIQ
module Messaging
module Stomp
module Queue
private
def publish_message_impl(options, &block)
address, headers = queue_for_publish(options)
headers[:sender] = options[:sender] if options[:sender]
headers[:message_type] = options[:message] if options[:message]
headers[:class_name] = options[:class_name] if options[:class_name]
headers[:correlation_id] = Time.now.to_i.to_s if block_given?
raw_publish(address, options[:payload] || '', headers)
return unless block_given?
receive_response(options[:service], headers[:correlation_id], &block)
end
def publish_messages_impl(messages)
messages.each { |msg_options| publish_message(msg_options) }
end
def subscribe_messages_impl(options)
queue_name, headers = queue_for_subscribe(options)
# for STOMP we can get message one at a time
subscribe(queue_name, headers) do |msg|
begin
ack(msg) if auto_ack?(options)
sender = msg.headers['sender']
message_type = msg.headers['message_type']
message_body = decode_body(msg.headers, msg.body)
client_headers = msg.headers.except(*internal_header_keys)
logger.info("Message received: queue(#{queue_name}), msg(#{payload_log(message_body)}), headers(#{msg.headers})")
messages = [
ManageIQ::Messaging::ReceivedMessage.new(sender, message_type, message_body, client_headers, msg, self)
]
result = yield messages
logger.info("Message processed")
correlation_ref = msg.headers['correlation_id']
if correlation_ref
result = result.first if result.kind_of?(Array)
send_response(options[:service], correlation_ref, result)
end
rescue => e
logger.error("Message processing error: #{e.message}")
logger.error(e.backtrace.join("\n"))
raise
end
end
end
end
end
end
end