lib/gorgon_bunny/lib/gorgon_bunny/channel.rb
# -*- coding: utf-8 -*-
require "thread"
require "monitor"
require "set"
require "gorgon_bunny/concurrent/atomic_fixnum"
require "gorgon_bunny/consumer_work_pool"
require "gorgon_bunny/exchange"
require "gorgon_bunny/queue"
require "gorgon_bunny/delivery_info"
require "gorgon_bunny/return_info"
require "gorgon_bunny/message_properties"
if defined?(JRUBY_VERSION)
require "gorgon_bunny/concurrent/linked_continuation_queue"
else
require "gorgon_bunny/concurrent/continuation_queue"
end
module GorgonBunny
# ## Channels in RabbitMQ
#
# To quote {http://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf AMQP 0.9.1 specification}:
#
# AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex
# a heavyweight TCP/IP connection into several light weight connections.
# This makes the protocol more “firewall friendly” since port usage is predictable.
# It also means that traffic shaping and other network QoS features can be easily employed.
# Channels are independent of each other and can perform different functions simultaneously
# with other channels, the available bandwidth being shared between the concurrent activities.
#
#
# ## Opening Channels
#
# Channels can be opened either via `GorgonBunny::Session#create_channel` (sufficient in the majority
# of cases) or by instantiating `GorgonBunny::Channel` directly:
#
# @example Using {GorgonBunny::Session#create_channel}:
# conn = GorgonBunny.new
# conn.start
#
# ch = conn.create_channel
#
# This will automatically allocate a channel id.
#
# ## Closing Channels
#
# Channels are closed via {GorgonBunny::Channel#close}. Channels that get a channel-level exception are
# closed, too. Closed channels can no longer be used. Attempts to use them will raise
# {GorgonBunny::ChannelAlreadyClosed}.
#
# @example
#
# ch = conn.create_channel
# ch.close
#
# ## Higher-level API
#
# GorgonBunny offers two sets of methods on {GorgonBunny::Channel}: known as higher-level and lower-level
# APIs, respectively. Higher-level API mimics {http://rubyamqp.info amqp gem} API where
# exchanges and queues are objects (instance of {GorgonBunny::Exchange} and {GorgonBunny::Queue}, respectively).
# Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are
# passed as strings (à la RabbitMQ Java client, {http://clojurerabbitmq.info Langohr} and Pika).
#
# ### Queue Operations In Higher-level API
#
# * {GorgonBunny::Channel#queue} is used to declare queues. The rest of the API is in {GorgonBunny::Queue}.
#
#
# ### Exchange Operations In Higher-level API
#
# * {GorgonBunny::Channel#topic} declares a topic exchange. The rest of the API is in {GorgonBunny::Exchange}.
# * {GorgonBunny::Channel#direct} declares a direct exchange.
# * {GorgonBunny::Channel#fanout} declares a fanout exchange.
# * {GorgonBunny::Channel#headers} declares a headers exchange.
# * {GorgonBunny::Channel#default_exchange}
# * {GorgonBunny::Channel#exchange} is used to declare exchanges with type specified as a symbol or string.
#
#
# ## Channel Qos (Prefetch Level)
#
# It is possible to control how many messages at most a consumer will be given (before it acknowledges
# or rejects previously consumed ones). This setting is per channel and controlled via {GorgonBunny::Channel#prefetch}.
#
#
# ## Channel IDs
#
# Channels are identified by their ids which are integers. GorgonBunny takes care of allocating and
# releasing them as channels are opened and closed. It is almost never necessary to specify
# channel ids explicitly.
#
# There is a limit on the maximum number of channels per connection, usually 65536. Note
# that allocating channels is very cheap on both client and server so having tens, hundreds
# or even thousands of channels is not a problem.
#
# ## Channels and Error Handling
#
# Channel-level exceptions are more common than connection-level ones and often indicate
# issues applications can recover from (such as consuming from or trying to delete
# a queue that does not exist).
#
# With GorgonBunny, channel-level exceptions are raised as Ruby exceptions, for example,
# {GorgonBunny::NotFound}, that provide access to the underlying `channel.close` method
# information.
#
# @example Handling 404 NOT_FOUND
# begin
# ch.queue_delete("queue_that_should_not_exist#{rand}")
# rescue GorgonBunny::NotFound => e
# puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
# end
#
# @example Handling 406 PRECONDITION_FAILED
# begin
# ch2 = conn.create_channel
# q = "bunny.examples.recovery.q#{rand}"
#
# ch2.queue_declare(q, :durable => false)
# ch2.queue_declare(q, :durable => true)
# rescue GorgonBunny::PreconditionFailed => e
# puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
# ensure
# conn.create_channel.queue_delete(q)
# end
#
# @see http://www.rabbitmq.com/tutorials/amqp-concepts.html AMQP 0.9.1 Model Concepts Guide
# @see http://rubybunny.info/articles/getting_started.html Getting Started with RabbitMQ Using GorgonBunny
# @see http://rubybunny.info/articles/queues.html Queues and Consumers
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing
# @see http://rubybunny.info/articles/error_handling.html Error Handling and Recovery Guide
class Channel
#
# API
#
# @return [Integer] Channel id
attr_accessor :id
# @return [GorgonBunny::Session] AMQP connection this channel was opened on
attr_reader :connection
# @return [Symbol] Channel status (:opening, :open, :closed)
attr_reader :status
# @return [GorgonBunny::ConsumerWorkPool] Thread pool delivered messages are dispatched to.
attr_reader :work_pool
# @return [Integer] Next publisher confirmations sequence index
attr_reader :next_publish_seq_no
# @return [Hash<String, GorgonBunny::Queue>] Queue instances declared on this channel
attr_reader :queues
# @return [Hash<String, GorgonBunny::Exchange>] Exchange instances declared on this channel
attr_reader :exchanges
# @return [Set<Integer>] Set of published message indexes that are currently unconfirmed
attr_reader :unconfirmed_set
# @return [Set<Integer>] Set of nacked message indexes that have been nacked
attr_reader :nacked_set
# @return [Hash<String, GorgonBunny::Consumer>] Consumer instances declared on this channel
attr_reader :consumers
DEFAULT_CONTENT_TYPE = "application/octet-stream".freeze
# @param [GorgonBunny::Session] connection AMQP 0.9.1 connection
# @param [Integer] id Channel id, pass nil to make GorgonBunny automatically allocate it
# @param [GorgonBunny::ConsumerWorkPool] work_pool Thread pool for delivery processing, by default of size 1
def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
@connection = connection
@logger = connection.logger
@id = id || @connection.next_channel_id
@status = :opening
@connection.register_channel(self)
@queues = Hash.new
@exchanges = Hash.new
@consumers = Hash.new
@work_pool = work_pool
# synchronizes frameset delivery. MK.
@publishing_mutex = @connection.mutex_impl.new
@consumer_mutex = @connection.mutex_impl.new
@unconfirmed_set_mutex = @connection.mutex_impl.new
self.reset_continuations
# threads awaiting on continuations. Used to unblock
# them when network connection goes down so that busy loops
# that perform synchronous operations can work. MK.
@threads_waiting_on_continuations = Set.new
@threads_waiting_on_confirms_continuations = Set.new
@threads_waiting_on_basic_get_continuations = Set.new
@next_publish_seq_no = 0
@recoveries_counter = GorgonBunny::Concurrent::AtomicFixnum.new(0)
end
attr_reader :recoveries_counter
# @private
def read_write_timeout
@connection.read_write_timeout
end
# Opens the channel and resets its internal state
# @return [GorgonBunny::Channel] Self
# @api public
def open
@threads_waiting_on_continuations = Set.new
@threads_waiting_on_confirms_continuations = Set.new
@threads_waiting_on_basic_get_continuations = Set.new
@connection.open_channel(self)
# clear last channel error
@last_channel_error = nil
@status = :open
self
end
# Closes the channel. Closed channels can no longer be used (this includes associated
# {GorgonBunny::Queue}, {GorgonBunny::Exchange} and {GorgonBunny::Consumer} instances.
# @api public
def close
@connection.close_channel(self)
closed!
maybe_kill_consumer_work_pool!
end
# @return [Boolean] true if this channel is open, false otherwise
# @api public
def open?
@status == :open
end
# @return [Boolean] true if this channel is closed (manually or because of an exception), false otherwise
# @api public
def closed?
@status == :closed
end
#
# @group Backwards compatibility with 0.8.0
#
# @return [Integer] Channel id
def number
self.id
end
# @return [Boolean] true if this channel is open
def active
open?
end
# @return [GorgonBunny::Session] Connection this channel was opened on
def client
@connection
end
# @private
def frame_size
@connection.frame_max
end
# @endgroup
#
# Higher-level API, similar to amqp gem
#
# @group Higher-level API for exchange operations
# Declares a fanout exchange or looks it up in the cache of previously
# declared exchanges.
#
# @param [String] name Exchange name
# @param [Hash] opts Exchange parameters
#
# @option opts [Boolean] :durable (false) Should the exchange be durable?
# @option opts [Boolean] :auto_delete (false) Should the exchange be automatically deleted when no longer in use?
# @option opts [Hash] :arguments ({}) Optional exchange arguments (used by RabbitMQ extensions)
#
# @return [GorgonBunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
# @api public
def fanout(name, opts = {})
Exchange.new(self, :fanout, name, opts)
end
# Declares a direct exchange or looks it up in the cache of previously
# declared exchanges.
#
# @param [String] name Exchange name
# @param [Hash] opts Exchange parameters
#
# @option opts [Boolean] :durable (false) Should the exchange be durable?
# @option opts [Boolean] :auto_delete (false) Should the exchange be automatically deleted when no longer in use?
# @option opts [Hash] :arguments ({}) Optional exchange arguments (used by RabbitMQ extensions)
#
# @return [GorgonBunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
# @api public
def direct(name, opts = {})
Exchange.new(self, :direct, name, opts)
end
# Declares a topic exchange or looks it up in the cache of previously
# declared exchanges.
#
# @param [String] name Exchange name
# @param [Hash] opts Exchange parameters
#
# @option opts [Boolean] :durable (false) Should the exchange be durable?
# @option opts [Boolean] :auto_delete (false) Should the exchange be automatically deleted when no longer in use?
# @option opts [Hash] :arguments ({}) Optional exchange arguments (used by RabbitMQ extensions)
#
# @return [GorgonBunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
# @api public
def topic(name, opts = {})
Exchange.new(self, :topic, name, opts)
end
# Declares a headers exchange or looks it up in the cache of previously
# declared exchanges.
#
# @param [String] name Exchange name
# @param [Hash] opts Exchange parameters
#
# @option opts [Boolean] :durable (false) Should the exchange be durable?
# @option opts [Boolean] :auto_delete (false) Should the exchange be automatically deleted when no longer in use?
# @option opts [Hash] :arguments ({}) Optional exchange arguments
#
# @return [GorgonBunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
# @api public
def headers(name, opts = {})
Exchange.new(self, :headers, name, opts)
end
# Provides access to the default exchange
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @api public
def default_exchange
self.direct(GorgonAMQ::Protocol::EMPTY_STRING, :no_declare => true)
end
# Declares a headers exchange or looks it up in the cache of previously
# declared exchanges.
#
# @param [String] name Exchange name
# @param [Hash] opts Exchange parameters
#
# @option opts [String,Symbol] :type (:direct) Exchange type, e.g. :fanout or "x-consistent-hash"
# @option opts [Boolean] :durable (false) Should the exchange be durable?
# @option opts [Boolean] :auto_delete (false) Should the exchange be automatically deleted when no longer in use?
# @option opts [Hash] :arguments ({}) Optional exchange arguments
#
# @return [GorgonBunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
def exchange(name, opts = {})
Exchange.new(self, opts.fetch(:type, :direct), name, opts)
end
# @endgroup
# @group Higher-level API for queue operations
# Declares a queue or looks it up in the per-channel cache.
#
# @param [String] name Queue name. Pass an empty string to declare a server-named queue (make RabbitMQ generate a unique name).
# @param [Hash] opts Queue properties and other options
#
# @option opts [Boolean] :durable (false) Should this queue be durable?
# @option opts [Boolean] :auto-delete (false) Should this queue be automatically deleted when the last consumer disconnects?
# @option opts [Boolean] :exclusive (false) Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?
# @option opts [Boolean] :arguments ({}) Additional optional arguments (typically used by RabbitMQ extensions and plugins)
#
# @return [GorgonBunny::Queue] Queue that was declared or looked up in the cache
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
# @api public
def queue(name = GorgonAMQ::Protocol::EMPTY_STRING, opts = {})
q = find_queue(name) || GorgonBunny::Queue.new(self, name, opts)
register_queue(q)
end
# @endgroup
# @group QoS and Flow Control
# Sets how many messages will be given to consumers on this channel before they
# have to acknowledge or reject one of the previously consumed messages
#
# @param [Integer] prefetch_count Prefetch (QoS setting) for this channel
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def prefetch(prefetch_count)
self.basic_qos(prefetch_count, false)
end
# Flow control. When set to false, RabbitMQ will stop delivering messages on this
# channel.
#
# @param [Boolean] active Should messages to consumers on this channel be delivered?
# @api public
def flow(active)
channel_flow(active)
end
# Tells RabbitMQ to redeliver unacknowledged messages
# @api public
def recover(ignored = true)
# RabbitMQ only supports basic.recover with requeue = true
basic_recover(true)
end
# @endgroup
# @group Message acknowledgements
# Rejects a message. A rejected message can be requeued or
# dropped by RabbitMQ.
#
# @param [Integer] delivery_tag Delivery tag to reject
# @param [Boolean] requeue Should this message be requeued instead of dropping it?
# @see GorgonBunny::Channel#ack
# @see GorgonBunny::Channel#nack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def reject(delivery_tag, requeue = false)
guarding_against_stale_delivery_tags(delivery_tag) do
basic_reject(delivery_tag.to_i, requeue)
end
end
# Acknowledges a message. Acknowledged messages are completely removed from the queue.
#
# @param [Integer] delivery_tag Delivery tag to acknowledge
# @param [Boolean] multiple (false) Should all unacknowledged messages up to this be acknowledged as well?
# @see GorgonBunny::Channel#nack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def ack(delivery_tag, multiple = false)
guarding_against_stale_delivery_tags(delivery_tag) do
basic_ack(delivery_tag.to_i, multiple)
end
end
alias acknowledge ack
# Rejects a message. A rejected message can be requeued or
# dropped by RabbitMQ. This method is similar to {GorgonBunny::Channel#reject} but
# supports rejecting multiple messages at once, and is usually preferred.
#
# @param [Integer] delivery_tag Delivery tag to reject
# @param [Boolean] multiple (false) Should all unacknowledged messages up to this be rejected as well?
# @param [Boolean] requeue (false) Should this message be requeued instead of dropping it?
# @see GorgonBunny::Channel#ack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def nack(delivery_tag, multiple = false, requeue = false)
guarding_against_stale_delivery_tags(delivery_tag) do
basic_nack(delivery_tag.to_i, multiple, requeue)
end
end
# @endgroup
#
# Lower-level API, exposes protocol operations as they are defined in the protocol,
# without any OO sugar on top, by design.
#
# @group Consumer and Message operations (basic.*)
# Publishes a message using basic.publish AMQP 0.9.1 method.
#
# @param [String] payload Message payload. It will never be modified by GorgonBunny or RabbitMQ in any way.
# @param [String] exchange Exchange to publish to
# @param [String] routing_key Routing key
# @param [Hash] opts Publishing options
#
# @option opts [Boolean] :persistent Should the message be persisted to disk?
# @option opts [Boolean] :mandatory Should the message be returned if it cannot be routed to any queue?
# @option opts [Integer] :timestamp A timestamp associated with this message
# @option opts [Integer] :expiration Expiration time after which the message will be deleted
# @option opts [String] :type Message type, e.g. what type of event or command this message represents. Can be any string
# @option opts [String] :reply_to Queue name other apps should send the response to
# @option opts [String] :content_type Message content type (e.g. application/json)
# @option opts [String] :content_encoding Message content encoding (e.g. gzip)
# @option opts [String] :correlation_id Message correlated to this one, e.g. what request this message is a reply for
# @option opts [Integer] :priority Message priority, 0 to 9. Not used by RabbitMQ, only applications
# @option opts [String] :message_id Any message identifier
# @option opts [String] :user_id Optional user ID. Verified by RabbitMQ against the actual connection username
# @option opts [String] :app_id Optional application ID
#
# @return [GorgonBunny::Channel] Self
# @api public
def basic_publish(payload, exchange, routing_key, opts = {})
raise_if_no_longer_open!
exchange_name = if exchange.respond_to?(:name)
exchange.name
else
exchange
end
mode = if opts.fetch(:persistent, true)
2
else
1
end
opts[:delivery_mode] ||= mode
opts[:content_type] ||= DEFAULT_CONTENT_TYPE
opts[:priority] ||= 0
if @next_publish_seq_no > 0
@unconfirmed_set.add(@next_publish_seq_no)
@next_publish_seq_no += 1
end
frames = GorgonAMQ::Protocol::Basic::Publish.encode(@id,
payload,
opts,
exchange_name,
routing_key,
opts[:mandatory],
false,
@connection.frame_max)
@connection.send_frameset_without_timeout(frames, self)
self
end
# Synchronously fetches a message from the queue, if there are any. This method is
# for cases when the convenience of synchronous operations is more important than
# throughput.
#
# @param [String] queue Queue name
# @param [Hash] opts Options
#
# @option opts [Boolean] :ack (true) Will this message be acknowledged manually?
#
# @return [Array] A triple of delivery info, message properties and message content
#
# @example Using GorgonBunny::Channel#basic_get with manual acknowledgements
# conn = GorgonBunny.new
# conn.start
# ch = conn.create_channel
# # here we assume the queue already exists and has messages
# delivery_info, properties, payload = ch.basic_get("bunny.examples.queue1", :ack => true)
# ch.acknowledge(delivery_info.delivery_tag)
# @see GorgonBunny::Queue#pop
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_get(queue, opts = {:ack => true})
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Basic::Get.encode(@id, queue, !(opts[:ack])))
# this is a workaround for the edge case when basic_get is called in a tight loop
# and network goes down we need to perform recovery. The problem is, basic_get will
# keep blocking the thread that calls it without clear way to constantly unblock it
# from the network activity loop (where recovery happens) with the current continuations
# implementation (and even more correct and convenient ones, such as wait/notify, should
# we implement them). So we return a triple of nils immediately which apps should be
# able to handle anyway as "got no message, no need to act". MK.
@last_basic_get_response = if @connection.open?
wait_on_basic_get_continuations
else
[nil, nil, nil]
end
raise_if_continuation_resulted_in_a_channel_error!
@last_basic_get_response
end
# Controls message delivery rate using basic.qos AMQP 0.9.1 method.
#
# @param [Integer] prefetch_count How many messages can consumers on this channel be given at a time
# (before they have to acknowledge or reject one of the earlier received messages)
# @param [Boolean] global (false) Ignored, as it is not supported by RabbitMQ
# @return [GorgonAMQ::Protocol::Basic::QosOk] RabbitMQ response
# @see GorgonBunny::Channel#prefetch
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_qos(prefetch_count, global = false)
raise ArgumentError.new("prefetch count must be a positive integer, given: #{prefetch_count}") if prefetch_count < 0
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Basic::Qos.encode(@id, 0, prefetch_count, global))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_basic_qos_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@prefetch_count = prefetch_count
@last_basic_qos_ok
end
# Redeliver unacknowledged messages
#
# @param [Boolean] requeue Should messages be requeued?
# @return [GorgonAMQ::Protocol::Basic::RecoverOk] RabbitMQ response
# @api public
def basic_recover(requeue)
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Basic::Recover.encode(@id, requeue))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_basic_recover_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_basic_recover_ok
end
# Rejects or requeues a message.
#
# @param [Integer] delivery_tag Delivery tag obtained from delivery info
# @param [Boolean] requeue Should the message be requeued?
# @return [NilClass] nil
#
# @example Requeue a message
# conn = GorgonBunny.new
# conn.start
#
# ch = conn.create_channel
# q.subscribe do |delivery_info, properties, payload|
# # requeue the message
# ch.basic_reject(delivery_info.delivery_tag, true)
# end
#
# @example Reject a message
# conn = GorgonBunny.new
# conn.start
#
# ch = conn.create_channel
# q.subscribe do |delivery_info, properties, payload|
# # requeue the message
# ch.basic_reject(delivery_info.delivery_tag, false)
# end
#
# @example Requeue a message fetched via basic.get
# conn = GorgonBunny.new
# conn.start
#
# ch = conn.create_channel
# # we assume the queue exists and has messages
# delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
# ch.basic_reject(delivery_info.delivery_tag, true)
#
# @see GorgonBunny::Channel#basic_nack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_reject(delivery_tag, requeue)
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Basic::Reject.encode(@id, delivery_tag, requeue))
nil
end
# Acknowledges a delivery (message).
#
# @param [Integer] delivery_tag Delivery tag obtained from delivery info
# @param [Boolean] multiple Should all deliveries up to this one be acknowledged?
# @return [NilClass] nil
#
# @example Ack a message
# conn = GorgonBunny.new
# conn.start
#
# ch = conn.create_channel
# q.subscribe do |delivery_info, properties, payload|
# # requeue the message
# ch.basic_ack(delivery_info.delivery_tag)
# end
#
# @example Ack a message fetched via basic.get
# conn = GorgonBunny.new
# conn.start
#
# ch = conn.create_channel
# # we assume the queue exists and has messages
# delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
# ch.basic_ack(delivery_info.delivery_tag)
#
# @example Ack multiple messages fetched via basic.get
# conn = GorgonBunny.new
# conn.start
#
# ch = conn.create_channel
# # we assume the queue exists and has messages
# _, _, payload1 = ch.basic_get("bunny.examples.queue3", :ack => true)
# _, _, payload2 = ch.basic_get("bunny.examples.queue3", :ack => true)
# delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :ack => true)
# # ack all fetched messages up to payload3
# ch.basic_ack(delivery_info.delivery_tag, true)
#
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @see #basic_ack_known_delivery_tag
# @api public
def basic_ack(delivery_tag, multiple)
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Basic::Ack.encode(@id, delivery_tag, multiple))
nil
end
# Rejects or requeues messages just like {GorgonBunny::Channel#basic_reject} but can do so
# with multiple messages at once.
#
# @param [Integer] delivery_tag Delivery tag obtained from delivery info
# @param [Boolean] requeue Should the message be requeued?
# @param [Boolean] multiple Should all deliveries up to this one be rejected/requeued?
# @return [NilClass] nil
#
# @example Requeue a message
# conn = GorgonBunny.new
# conn.start
#
# ch = conn.create_channel
# q.subscribe do |delivery_info, properties, payload|
# # requeue the message
# ch.basic_nack(delivery_info.delivery_tag, false, true)
# end
#
# @example Reject a message
# conn = GorgonBunny.new
# conn.start
#
# ch = conn.create_channel
# q.subscribe do |delivery_info, properties, payload|
# # requeue the message
# ch.basic_nack(delivery_info.delivery_tag)
# end
#
# @example Requeue a message fetched via basic.get
# conn = GorgonBunny.new
# conn.start
#
# ch = conn.create_channel
# # we assume the queue exists and has messages
# delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
# ch.basic_nack(delivery_info.delivery_tag, false, true)
#
#
# @example Requeue multiple messages fetched via basic.get
# conn = GorgonBunny.new
# conn.start
#
# ch = conn.create_channel
# # we assume the queue exists and has messages
# _, _, payload1 = ch.basic_get("bunny.examples.queue3", :ack => true)
# _, _, payload2 = ch.basic_get("bunny.examples.queue3", :ack => true)
# delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :ack => true)
# # requeue all fetched messages up to payload3
# ch.basic_nack(delivery_info.delivery_tag, true, true)
#
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
# @api public
def basic_nack(delivery_tag, multiple = false, requeue = false)
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Basic::Nack.encode(@id,
delivery_tag,
multiple,
requeue))
nil
end
# Registers a consumer for queue. Delivered messages will be handled with the block
# provided to this method.
#
# @param [String, GorgonBunny::Queue] queue Queue to consume from
# @param [String] consumer_tag Consumer tag (unique identifier), generated by GorgonBunny by default
# @param [Boolean] no_ack (false) If true, delivered messages will be automatically acknowledged.
# If false, manual acknowledgements will be necessary.
# @param [Boolean] exclusive (false) Should this consumer be exclusive?
# @param [Hash] arguments (nil) Optional arguments that may be used by RabbitMQ extensions, etc
#
# @return [GorgonAMQ::Protocol::Basic::ConsumeOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block)
raise_if_no_longer_open!
maybe_start_consumer_work_pool!
queue_name = if queue.respond_to?(:name)
queue.name
else
queue
end
# helps avoid race condition between basic.consume-ok and basic.deliver if there are messages
# in the queue already. MK.
if consumer_tag && consumer_tag.strip != GorgonAMQ::Protocol::EMPTY_STRING
add_consumer(queue_name, consumer_tag, no_ack, exclusive, arguments, &block)
end
@connection.send_frame(GorgonAMQ::Protocol::Basic::Consume.encode(@id,
queue_name,
consumer_tag,
false,
no_ack,
exclusive,
false,
arguments))
begin
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_basic_consume_ok = wait_on_continuations
end
rescue Exception => e
# if basic.consume-ok never arrives, unregister the proactively
# registered consumer. MK.
unregister_consumer(@last_basic_consume_ok.consumer_tag)
raise e
end
# in case there is another exclusive consumer and we get a channel.close
# response here. MK.
raise_if_channel_close!(@last_basic_consume_ok)
# covers server-generated consumer tags
add_consumer(queue_name, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments, &block)
@last_basic_consume_ok
end
alias consume basic_consume
# Registers a consumer for queue as {GorgonBunny::Consumer} instance.
#
# @param [GorgonBunny::Consumer] consumer Consumer to register. It should already have queue name, consumer tag
# and other attributes set.
#
# @return [GorgonAMQ::Protocol::Basic::ConsumeOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_consume_with(consumer)
raise_if_no_longer_open!
maybe_start_consumer_work_pool!
# helps avoid race condition between basic.consume-ok and basic.deliver if there are messages
# in the queue already. MK.
if consumer.consumer_tag && consumer.consumer_tag.strip != GorgonAMQ::Protocol::EMPTY_STRING
register_consumer(consumer.consumer_tag, consumer)
end
@connection.send_frame(GorgonAMQ::Protocol::Basic::Consume.encode(@id,
consumer.queue_name,
consumer.consumer_tag,
false,
consumer.no_ack,
consumer.exclusive,
false,
consumer.arguments))
begin
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_basic_consume_ok = wait_on_continuations
end
rescue Exception => e
# if basic.consume-ok never arrives, unregister the proactively
# registered consumer. MK.
unregister_consumer(@last_basic_consume_ok.consumer_tag)
raise e
end
# in case there is another exclusive consumer and we get a channel.close
# response here. MK.
raise_if_channel_close!(@last_basic_consume_ok)
# covers server-generated consumer tags
register_consumer(@last_basic_consume_ok.consumer_tag, consumer)
raise_if_continuation_resulted_in_a_channel_error!
@last_basic_consume_ok
end
alias consume_with basic_consume_with
# Removes a consumer. Messages for this consumer will no longer be delivered. If the queue
# it was on is auto-deleted and this consumer was the last one, the queue will be deleted.
#
# @param [String] consumer_tag Consumer tag (unique identifier) to cancel
#
# @return [GorgonAMQ::Protocol::Basic::CancelOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_cancel(consumer_tag)
@connection.send_frame(GorgonAMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_basic_cancel_ok = wait_on_continuations
end
maybe_kill_consumer_work_pool! unless any_consumers?
@last_basic_cancel_ok
end
# @return [Boolean] true if there are consumers on this channel
# @api public
def any_consumers?
@consumer_mutex.synchronize { @consumers.any? }
end
# @endgroup
# @group Queue operations (queue.*)
# Declares a queue using queue.declare AMQP 0.9.1 method.
#
# @param [String] name Queue name
# @param [Hash] opts Queue properties
#
# @option opts [Boolean] durable (false) Should information about this queue be persisted to disk so that it
# can survive broker restarts? Typically set to true for long-lived queues.
# @option opts [Boolean] auto_delete (false) Should this queue be deleted when the last consumer is cancelled?
# @option opts [Boolean] exclusive (false) Should only this connection be able to use this queue?
# If true, the queue will be automatically deleted when this
# connection is closed
# @option opts [Boolean] passive (false) If true, queue will be checked for existence. If it does not
# exist, {GorgonBunny::NotFound} will be raised.
#
# @return [GorgonAMQ::Protocol::Queue::DeclareOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def queue_declare(name, opts = {})
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Queue::Declare.encode(@id,
name,
opts.fetch(:passive, false),
opts.fetch(:durable, false),
opts.fetch(:exclusive, false),
opts.fetch(:auto_delete, false),
false,
opts[:arguments]))
@last_queue_declare_ok = wait_on_continuations
raise_if_continuation_resulted_in_a_channel_error!
@last_queue_declare_ok
end
# Deletes a queue using queue.delete AMQP 0.9.1 method
#
# @param [String] name Queue name
# @param [Hash] opts Options
#
# @option opts [Boolean] if_unused (false) Should this queue be deleted only if it has no consumers?
# @option opts [Boolean] if_empty (false) Should this queue be deleted only if it has no messages?
#
# @return [GorgonAMQ::Protocol::Queue::DeleteOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def queue_delete(name, opts = {})
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Queue::Delete.encode(@id,
name,
opts[:if_unused],
opts[:if_empty],
false))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_queue_delete_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_queue_delete_ok
end
# Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.
#
# @param [String] name Queue name
#
# @return [GorgonAMQ::Protocol::Queue::PurgeOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def queue_purge(name, opts = {})
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Queue::Purge.encode(@id, name, false))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_queue_purge_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_queue_purge_ok
end
# Binds a queue to an exchange using queue.bind AMQP 0.9.1 method
#
# @param [String] name Queue name
# @param [String] exchange Exchange name
# @param [Hash] opts Options
#
# @option opts [String] routing_key (nil) Routing key used for binding
# @option opts [Hash] arguments ({}) Optional arguments
#
# @return [GorgonAMQ::Protocol::Queue::BindOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @see http://rubybunny.info/articles/bindings.html Bindings guide
# @api public
def queue_bind(name, exchange, opts = {})
raise_if_no_longer_open!
exchange_name = if exchange.respond_to?(:name)
exchange.name
else
exchange
end
@connection.send_frame(GorgonAMQ::Protocol::Queue::Bind.encode(@id,
name,
exchange_name,
opts[:routing_key],
false,
opts[:arguments]))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_queue_bind_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_queue_bind_ok
end
# Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method
#
# @param [String] name Queue name
# @param [String] exchange Exchange name
# @param [Hash] opts Options
#
# @option opts [String] routing_key (nil) Routing key used for binding
# @option opts [Hash] arguments ({}) Optional arguments
#
# @return [GorgonAMQ::Protocol::Queue::UnbindOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @see http://rubybunny.info/articles/bindings.html Bindings guide
# @api public
def queue_unbind(name, exchange, opts = {})
raise_if_no_longer_open!
exchange_name = if exchange.respond_to?(:name)
exchange.name
else
exchange
end
@connection.send_frame(GorgonAMQ::Protocol::Queue::Unbind.encode(@id,
name,
exchange_name,
opts[:routing_key],
opts[:arguments]))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_queue_unbind_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_queue_unbind_ok
end
# @endgroup
# @group Exchange operations (exchange.*)
# Declares a echange using echange.declare AMQP 0.9.1 method.
#
# @param [String] name Exchange name
# @param [Hash] opts Exchange properties
#
# @option opts [Boolean] durable (false) Should information about this echange be persisted to disk so that it
# can survive broker restarts? Typically set to true for long-lived exchanges.
# @option opts [Boolean] auto_delete (false) Should this echange be deleted when it is no longer used?
# @option opts [Boolean] passive (false) If true, exchange will be checked for existence. If it does not
# exist, {GorgonBunny::NotFound} will be raised.
#
# @return [GorgonAMQ::Protocol::Exchange::DeclareOk] RabbitMQ response
# @see http://rubybunny.info/articles/echanges.html Exchanges and Publishing guide
# @api public
def exchange_declare(name, type, opts = {})
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Exchange::Declare.encode(@id,
name,
type.to_s,
opts.fetch(:passive, false),
opts.fetch(:durable, false),
opts.fetch(:auto_delete, false),
false,
false,
opts[:arguments]))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_declare_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_exchange_declare_ok
end
# Deletes a exchange using exchange.delete AMQP 0.9.1 method
#
# @param [String] name Exchange name
# @param [Hash] opts Options
#
# @option opts [Boolean] if_unused (false) Should this exchange be deleted only if it is no longer used
#
# @return [GorgonAMQ::Protocol::Exchange::DeleteOk] RabbitMQ response
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @api public
def exchange_delete(name, opts = {})
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Exchange::Delete.encode(@id,
name,
opts[:if_unused],
false))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_delete_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_exchange_delete_ok
end
# Binds an exchange to another exchange using exchange.bind AMQP 0.9.1 extension
# that RabbitMQ provides.
#
# @param [String] source Source exchange name
# @param [String] destination Destination exchange name
# @param [Hash] opts Options
#
# @option opts [String] routing_key (nil) Routing key used for binding
# @option opts [Hash] arguments ({}) Optional arguments
#
# @return [GorgonAMQ::Protocol::Exchange::BindOk] RabbitMQ response
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/bindings.html Bindings guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
# @api public
def exchange_bind(source, destination, opts = {})
raise_if_no_longer_open!
source_name = if source.respond_to?(:name)
source.name
else
source
end
destination_name = if destination.respond_to?(:name)
destination.name
else
destination
end
@connection.send_frame(GorgonAMQ::Protocol::Exchange::Bind.encode(@id,
destination_name,
source_name,
opts[:routing_key],
false,
opts[:arguments]))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_bind_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_exchange_bind_ok
end
# Unbinds an exchange from another exchange using exchange.unbind AMQP 0.9.1 extension
# that RabbitMQ provides.
#
# @param [String] source Source exchange name
# @param [String] destination Destination exchange name
# @param [Hash] opts Options
#
# @option opts [String] routing_key (nil) Routing key used for binding
# @option opts [Hash] arguments ({}) Optional arguments
#
# @return [GorgonAMQ::Protocol::Exchange::UnbindOk] RabbitMQ response
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/bindings.html Bindings guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
# @api public
def exchange_unbind(source, destination, opts = {})
raise_if_no_longer_open!
source_name = if source.respond_to?(:name)
source.name
else
source
end
destination_name = if destination.respond_to?(:name)
destination.name
else
destination
end
@connection.send_frame(GorgonAMQ::Protocol::Exchange::Unbind.encode(@id,
destination_name,
source_name,
opts[:routing_key],
false,
opts[:arguments]))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_unbind_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_exchange_unbind_ok
end
# @endgroup
# @group Flow control (channel.*)
# Enables or disables message flow for the channel. When message flow is disabled,
# no new messages will be delivered to consumers on this channel. This is typically
# used by consumers that cannot keep up with the influx of messages.
#
# @note Recent (e.g. 2.8.x., 3.x) RabbitMQ will employ TCP/IP-level back pressure on publishers if it detects
# that consumers do not keep up with them.
#
# @return [GorgonAMQ::Protocol::Channel::FlowOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def channel_flow(active)
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Channel::Flow.encode(@id, active))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_channel_flow_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_channel_flow_ok
end
# @endgroup
# @group Transactions (tx.*)
# Puts the channel into transaction mode (starts a transaction)
# @return [GorgonAMQ::Protocol::Tx::SelectOk] RabbitMQ response
# @api public
def tx_select
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Tx::Select.encode(@id))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_tx_select_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_tx_select_ok
end
# Commits current transaction
# @return [GorgonAMQ::Protocol::Tx::CommitOk] RabbitMQ response
# @api public
def tx_commit
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Tx::Commit.encode(@id))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_tx_commit_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_tx_commit_ok
end
# Rolls back current transaction
# @return [GorgonAMQ::Protocol::Tx::RollbackOk] RabbitMQ response
# @api public
def tx_rollback
raise_if_no_longer_open!
@connection.send_frame(GorgonAMQ::Protocol::Tx::Rollback.encode(@id))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_tx_rollback_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_tx_rollback_ok
end
# @endgroup
# @group Publisher Confirms (confirm.*)
# @return [Boolean] true if this channel has Publisher Confirms enabled, false otherwise
# @api public
def using_publisher_confirmations?
@next_publish_seq_no > 0
end
# Enables publisher confirms for the channel.
# @return [GorgonAMQ::Protocol::Confirm::SelectOk] RabbitMQ response
# @see #wait_for_confirms
# @see #unconfirmed_set
# @see #nacked_set
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
# @api public
def confirm_select(callback=nil)
raise_if_no_longer_open!
if @next_publish_seq_no == 0
@confirms_continuations = new_continuation
@unconfirmed_set = Set.new
@nacked_set = Set.new
@next_publish_seq_no = 1
end
@confirms_callback = callback
@connection.send_frame(GorgonAMQ::Protocol::Confirm::Select.encode(@id, false))
GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_confirm_select_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
@last_confirm_select_ok
end
# Blocks calling thread until confirms are received for all
# currently unacknowledged published messages.
#
# @return [Boolean] true if all messages were acknowledged positively, false otherwise
# @see #confirm_select
# @see #unconfirmed_set
# @see #nacked_set
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
# @api public
def wait_for_confirms
@only_acks_received = true
wait_on_confirms_continuations
@only_acks_received
end
# @endgroup
# @group Misc
# Synchronizes given block using this channel's mutex.
# @api public
def synchronize(&block)
@publishing_mutex.synchronize(&block)
end
# Unique string supposed to be used as a consumer tag.
#
# @return [String] Unique string.
# @api plugin
def generate_consumer_tag(name = "bunny")
"#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
end
# @endgroup
#
# Error Handilng
#
# Defines a handler for errors that are not responses to a particular
# operations (e.g. basic.ack, basic.reject, basic.nack).
#
# @api public
def on_error(&block)
@on_error = block
end
#
# Recovery
#
# @group Network Failure Recovery
# Recovers basic.qos setting, exchanges, queues and consumers. Used by the Automatic Network Failure
# Recovery feature.
#
# @api plugin
def recover_from_network_failure
@logger.debug "Recovering channel #{@id} after network failure"
release_all_continuations
recover_prefetch_setting
recover_exchanges
# this includes recovering bindings
recover_queues
recover_consumers
increment_recoveries_counter
end
# Recovers basic.qos setting. Used by the Automatic Network Failure
# Recovery feature.
#
# @api plugin
def recover_prefetch_setting
basic_qos(@prefetch_count) if @prefetch_count
end
# Recovers exchanges. Used by the Automatic Network Failure
# Recovery feature.
#
# @api plugin
def recover_exchanges
@exchanges.values.dup.each do |x|
x.recover_from_network_failure
end
end
# Recovers queues and bindings. Used by the Automatic Network Failure
# Recovery feature.
#
# @api plugin
def recover_queues
@queues.values.dup.each do |q|
@logger.debug "Recovering queue #{q.name}"
q.recover_from_network_failure
end
end
# Recovers consumers. Used by the Automatic Network Failure
# Recovery feature.
#
# @api plugin
def recover_consumers
unless @consumers.empty?
@work_pool = ConsumerWorkPool.new(@work_pool.size)
@work_pool.start
end
@consumers.values.dup.each do |c|
c.recover_from_network_failure
end
end
# @private
def increment_recoveries_counter
@recoveries_counter.increment
end
# @endgroup
# @return [String] Brief human-readable representation of the channel
def to_s
"#<#{self.class.name}:#{object_id} @id=#{self.number} @connection=#{@connection.to_s}>"
end
#
# Implementation
#
# @private
def register_consumer(consumer_tag, consumer)
@consumer_mutex.synchronize do
@consumers[consumer_tag] = consumer
end
end
# @private
def unregister_consumer(consumer_tag)
@consumer_mutex.synchronize do
@consumers.delete(consumer_tag)
end
end
# @private
def add_consumer(queue, consumer_tag, no_ack, exclusive, arguments, &block)
@consumer_mutex.synchronize do
c = Consumer.new(self, queue, consumer_tag, no_ack, exclusive, arguments)
c.on_delivery(&block) if block
@consumers[consumer_tag] = c
end
end
# @private
def handle_method(method)
@logger.debug "Channel#handle_frame on channel #{@id}: #{method.inspect}"
case method
when GorgonAMQ::Protocol::Queue::DeclareOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Queue::DeleteOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Queue::PurgeOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Queue::BindOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Queue::UnbindOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Exchange::BindOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Exchange::UnbindOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Exchange::DeclareOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Exchange::DeleteOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Basic::QosOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Basic::RecoverOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Channel::FlowOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Basic::ConsumeOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Basic::Cancel then
if consumer = @consumers[method.consumer_tag]
@work_pool.submit do
begin
@consumers.delete(method.consumer_tag)
consumer.handle_cancellation(method)
rescue Exception => e
@logger.error "Got exception when notifying consumer #{method.consumer_tag} about cancellation!"
end
end
else
@logger.warn "No consumer for tag #{method.consumer_tag} on channel #{@id}!"
end
when GorgonAMQ::Protocol::Basic::CancelOk then
@continuations.push(method)
unregister_consumer(method.consumer_tag)
when GorgonAMQ::Protocol::Tx::SelectOk, GorgonAMQ::Protocol::Tx::CommitOk, GorgonAMQ::Protocol::Tx::RollbackOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Tx::SelectOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Confirm::SelectOk then
@continuations.push(method)
when GorgonAMQ::Protocol::Basic::Ack then
handle_ack_or_nack(method.delivery_tag, method.multiple, false)
when GorgonAMQ::Protocol::Basic::Nack then
handle_ack_or_nack(method.delivery_tag, method.multiple, true)
when GorgonAMQ::Protocol::Channel::Close then
closed!
@connection.send_frame(GorgonAMQ::Protocol::Channel::CloseOk.encode(@id))
# basic.ack, basic.reject, basic.nack. MK.
if channel_level_exception_after_operation_that_has_no_response?(method)
@on_error.call(self, method) if @on_error
else
@last_channel_error = instantiate_channel_level_exception(method)
@continuations.push(method)
end
when GorgonAMQ::Protocol::Channel::CloseOk then
@continuations.push(method)
else
raise "Do not know how to handle #{method.inspect} in GorgonBunny::Channel#handle_method"
end
end
# @private
def channel_level_exception_after_operation_that_has_no_response?(method)
method.reply_code == 406 && method.reply_text =~ /unknown delivery tag/
end
# @private
def handle_basic_get_ok(basic_get_ok, properties, content)
basic_get_ok.delivery_tag = VersionedDeliveryTag.new(basic_get_ok.delivery_tag, @recoveries_counter.get)
@basic_get_continuations.push([basic_get_ok, properties, content])
end
# @private
def handle_basic_get_empty(basic_get_empty)
@basic_get_continuations.push([nil, nil, nil])
end
# @private
def handle_frameset(basic_deliver, properties, content)
consumer = @consumers[basic_deliver.consumer_tag]
if consumer
@work_pool.submit do
consumer.call(DeliveryInfo.new(basic_deliver, consumer, self), MessageProperties.new(properties), content)
end
else
@logger.warn "No consumer for tag #{basic_deliver.consumer_tag} on channel #{@id}!"
end
end
# @private
def handle_basic_return(basic_return, properties, content)
x = find_exchange(basic_return.exchange)
if x
x.handle_return(ReturnInfo.new(basic_return), MessageProperties.new(properties), content)
else
@logger.warn "Exchange #{basic_return.exchange} is not in channel #{@id}'s cache! Dropping returned message!"
end
end
# @private
def handle_ack_or_nack(delivery_tag, multiple, nack)
if nack
cloned_set = @unconfirmed_set.clone
if multiple
cloned_set.keep_if { |i| i <= delivery_tag }
@nacked_set.merge(cloned_set)
else
@nacked_set.add(delivery_tag)
end
end
if multiple
@unconfirmed_set.delete_if { |i| i <= delivery_tag }
else
@unconfirmed_set.delete(delivery_tag)
end
@unconfirmed_set_mutex.synchronize do
@only_acks_received = (@only_acks_received && !nack)
@confirms_continuations.push(true) if @unconfirmed_set.empty?
@confirms_callback.call(delivery_tag, multiple, nack) if @confirms_callback
end
end
# @private
def wait_on_continuations
if @connection.threaded
t = Thread.current
@threads_waiting_on_continuations << t
begin
@continuations.poll(@connection.continuation_timeout)
ensure
@threads_waiting_on_continuations.delete(t)
end
else
connection.reader_loop.run_once until @continuations.length > 0
@continuations.pop
end
end
# @private
def wait_on_basic_get_continuations
if @connection.threaded
t = Thread.current
@threads_waiting_on_basic_get_continuations << t
begin
@basic_get_continuations.poll(@connection.continuation_timeout)
ensure
@threads_waiting_on_basic_get_continuations.delete(t)
end
else
connection.event_loop.run_once until @basic_get_continuations.length > 0
@basic_get_continuations.pop
end
end
# @private
def wait_on_confirms_continuations
if @connection.threaded
t = Thread.current
@threads_waiting_on_confirms_continuations << t
begin
@confirms_continuations.poll(@connection.continuation_timeout)
ensure
@threads_waiting_on_confirms_continuations.delete(t)
end
else
connection.event_loop.run_once until @confirms_continuations.length > 0
@confirms_continuations.pop
end
end
# Releases all continuations. Used by automatic network recovery.
# @private
def release_all_continuations
@threads_waiting_on_confirms_continuations.each do |t|
t.run
end
@threads_waiting_on_continuations.each do |t|
t.run
end
@threads_waiting_on_basic_get_continuations.each do |t|
t.run
end
self.reset_continuations
end
# Starts consumer work pool. Lazily called by #basic_consume to avoid creating new threads
# that won't do any real work for channels that do not register consumers (e.g. only used for
# publishing). MK.
# @private
def maybe_start_consumer_work_pool!
if @work_pool && !@work_pool.running?
@work_pool.start
end
end
# @private
def maybe_pause_consumer_work_pool!
@work_pool.pause if @work_pool && @work_pool.running?
end
# @private
def maybe_kill_consumer_work_pool!
if @work_pool && @work_pool.running?
@work_pool.kill
end
end
# @private
def read_next_frame(options = {})
@connection.read_next_frame(options = {})
end
# @private
def deregister_queue(queue)
@queues.delete(queue.name)
end
# @private
def deregister_queue_named(name)
@queues.delete(name)
end
# @private
def register_queue(queue)
@queues[queue.name] = queue
end
# @private
def find_queue(name)
@queues[name]
end
# @private
def deregister_exchange(exchange)
@exchanges.delete(exchange.name)
end
# @private
def register_exchange(exchange)
@exchanges[exchange.name] = exchange
end
# @private
def find_exchange(name)
@exchanges[name]
end
protected
# @private
def closed!
@status = :closed
@work_pool.shutdown
@connection.release_channel_id(@id)
end
# @private
def instantiate_channel_level_exception(frame)
case frame
when GorgonAMQ::Protocol::Channel::Close then
klass = case frame.reply_code
when 403 then
AccessRefused
when 404 then
NotFound
when 405 then
ResourceLocked
when 406 then
PreconditionFailed
else
ChannelLevelException
end
klass.new(frame.reply_text, self, frame)
end
end
# @private
def raise_if_continuation_resulted_in_a_channel_error!
raise @last_channel_error if @last_channel_error
end
# @private
def raise_if_no_longer_open!
raise ChannelAlreadyClosed.new("cannot use a channel that was already closed! Channel id: #{@id}", self) if closed?
end
# @private
def raise_if_channel_close!(method)
if method && method.is_a?(GorgonAMQ::Protocol::Channel::Close)
# basic.ack, basic.reject, basic.nack. MK.
if channel_level_exception_after_operation_that_has_no_response?(method)
@on_error.call(self, method) if @on_error
else
@last_channel_error = instantiate_channel_level_exception(method)
raise @last_channel_error
end
end
end
# @private
def reset_continuations
@continuations = new_continuation
@confirms_continuations = new_continuation
@basic_get_continuations = new_continuation
end
if defined?(JRUBY_VERSION)
# @private
def new_continuation
Concurrent::LinkedContinuationQueue.new
end
else
# @private
def new_continuation
Concurrent::ContinuationQueue.new
end
end # if defined?
# @private
def guarding_against_stale_delivery_tags(tag, &block)
case tag
# if a fixnum was passed, execute unconditionally. MK.
when Fixnum then
block.call
# versioned delivery tags should be checked to avoid
# sending out stale (invalid) tags after channel was reopened
# during network failure recovery. MK.
when VersionedDeliveryTag then
if !tag.stale?(@recoveries_counter.get)
block.call
end
end
end
end # Channel
end # GorgonBunny