lib/ffi-rzmq/poll.rb
require 'forwardable'
module ZMQ
class Poller
extend Forwardable
def_delegators :@poll_items, :size, :inspect
attr_reader :readables, :writables
def initialize
@poll_items = ZMQ::PollItems.new
@readables = []
@writables = []
end
# Checks each registered socket for selectability based on the poll items'
# registered +events+. Will block for up to +timeout+ milliseconds.
# A millisecond is 1/1000 of a second, so to block for 1 second
# pass the value "1000" to #poll.
#
# Pass "-1" or +:blocking+ for +timeout+ for this call to block
# indefinitely.
#
# This method will return *immediately* when there are no registered
# sockets. In that case, the +timeout+ parameter is not honored. To
# prevent a CPU busy-loop, the caller of this method should detect
# this possible condition (via #size) and throttle the call
# frequency.
#
# Returns 0 when there are no registered sockets that are readable
# or writable.
#
# Return 1 (or greater) to indicate the number of readable or writable
# sockets. These sockets should be processed using the #readables and
# #writables accessors.
#
# Returns -1 when there is an error. Use ZMQ::Util.errno to get the related
# error number.
#
def poll timeout = :blocking
unless @poll_items.empty?
timeout = adjust timeout
items_triggered = LibZMQ.zmq_poll @poll_items.address, @poll_items.size, timeout
update_selectables if Util.resultcode_ok?(items_triggered)
items_triggered
else
0
end
end
# The non-blocking version of #poll. See the #poll description for
# potential exceptions.
#
# May return -1 when an error is encounted. Check ZMQ::Util.errno
# to determine the underlying cause.
#
def poll_nonblock
poll 0
end
# Register the +pollable+ for +events+. This method is idempotent meaning
# it can be called multiple times with the same data and the socket
# will only get registered at most once. Calling multiple times with
# different values for +events+ will OR the event information together.
#
def register pollable, events = ZMQ::POLLIN | ZMQ::POLLOUT
return if pollable.nil? || events.zero?
unless item = @poll_items[pollable]
item = PollItem.from_pollable(pollable)
@poll_items << item
end
item.events |= events
end
# Deregister the +pollable+ for +events+. When there are no events left
# or socket has been closed this also deletes the socket from the poll items.
#
def deregister pollable, events
return unless pollable
item = @poll_items[pollable]
if item && (item.events & events) > 0
item.events ^= events
delete(pollable) if item.events.zero? || item.closed?
true
else
false
end
end
# A helper method to register a +pollable+ as readable events only.
#
def register_readable pollable
register pollable, ZMQ::POLLIN
end
# A helper method to register a +pollable+ for writable events only.
#
def register_writable pollable
register pollable, ZMQ::POLLOUT
end
# A helper method to deregister a +pollable+ for readable events.
#
def deregister_readable pollable
deregister pollable, ZMQ::POLLIN
end
# A helper method to deregister a +pollable+ for writable events.
#
def deregister_writable pollable
deregister pollable, ZMQ::POLLOUT
end
# Deletes the +pollable+ for all subscribed events. Called internally
# when a socket has been deregistered and has no more events
# registered anywhere.
#
# Can also be called directly to remove the socket from the polling
# array.
#
def delete pollable
return false if @poll_items.empty?
@poll_items.delete(pollable)
end
def to_s; inspect; end
private
def update_selectables
@readables.clear
@writables.clear
@poll_items.each do |poll_item|
@readables << poll_item.pollable if poll_item.readable?
@writables << poll_item.pollable if poll_item.writable?
end
end
# Convert the timeout value to something usable by
# the library.
#
# -1 or :blocking should be converted to -1.
#
# Users will pass in values measured as
# milliseconds, so we need to convert that value to
# microseconds for the library.
#
def adjust timeout
if :blocking == timeout || -1 == timeout
-1
else
timeout.to_i
end
end
end
end