chuckremes/ffi-rzmq

View on GitHub
lib/ffi-rzmq/poll.rb

Summary

Maintainability
A
45 mins
Test Coverage
require 'forwardable'

module ZMQ
  class Poller
    extend Forwardable

    def_delegators :@poll_items, :size, :inspect
    attr_reader :readables, :writables

    def initialize
      @poll_items = ZMQ::PollItems.new
      @readables = []
      @writables = []
    end

    # Checks each registered socket for selectability based on the poll items'
    # registered +events+. Will block for up to +timeout+ milliseconds.
    # A millisecond is 1/1000 of a second, so to block for 1 second
    # pass the value "1000" to #poll.
    #
    # Pass "-1" or +:blocking+ for +timeout+ for this call to block
    # indefinitely.
    #
    # This method will return *immediately* when there are no registered
    # sockets. In that case, the +timeout+ parameter is not honored. To
    # prevent a CPU busy-loop, the caller of this method should detect
    # this possible condition (via #size) and throttle the call
    # frequency.
    #
    # Returns 0 when there are no registered sockets that are readable
    # or writable.
    #
    # Return 1 (or greater) to indicate the number of readable or writable
    # sockets. These sockets should be processed using the #readables and
    # #writables accessors.
    #
    # Returns -1 when there is an error. Use ZMQ::Util.errno to get the related
    # error number.
    #
    def poll timeout = :blocking
      unless @poll_items.empty?
        timeout = adjust timeout
        items_triggered = LibZMQ.zmq_poll @poll_items.address, @poll_items.size, timeout

        update_selectables if Util.resultcode_ok?(items_triggered)
        items_triggered
      else
        0
      end
    end

    # The non-blocking version of #poll. See the #poll description for
    # potential exceptions.
    #
    # May return -1 when an error is encounted. Check ZMQ::Util.errno
    # to determine the underlying cause.
    #
    def poll_nonblock
      poll 0
    end

    # Register the +pollable+ for +events+. This method is idempotent meaning
    # it can be called multiple times with the same data and the socket
    # will only get registered at most once. Calling multiple times with
    # different values for +events+ will OR the event information together.
    #
    def register pollable, events = ZMQ::POLLIN | ZMQ::POLLOUT
      return if pollable.nil? || events.zero?

      unless item = @poll_items[pollable]
        item = PollItem.from_pollable(pollable)
        @poll_items << item
      end

      item.events |= events
    end

    # Deregister the +pollable+ for +events+. When there are no events left
    # or socket has been closed this also deletes the socket from the poll items.
    #
    def deregister pollable, events
      return unless pollable

      item = @poll_items[pollable]
      if item && (item.events & events) > 0
        item.events ^= events
        delete(pollable) if item.events.zero? || item.closed?
        true
      else
        false
      end
    end

    # A helper method to register a +pollable+ as readable events only.
    #
    def register_readable pollable
      register pollable, ZMQ::POLLIN
    end

    # A helper method to register a +pollable+ for writable events only.
    #
    def register_writable pollable
      register pollable, ZMQ::POLLOUT
    end

    # A helper method to deregister a +pollable+ for readable events.
    #
    def deregister_readable pollable
      deregister pollable, ZMQ::POLLIN
    end

    # A helper method to deregister a +pollable+ for writable events.
    #
    def deregister_writable pollable
      deregister pollable, ZMQ::POLLOUT
    end

    # Deletes the +pollable+ for all subscribed events. Called internally
    # when a socket has been deregistered and has no more events
    # registered anywhere.
    #
    # Can also be called directly to remove the socket from the polling
    # array.
    #
    def delete pollable
      return false if @poll_items.empty?
      @poll_items.delete(pollable)
    end

    def to_s; inspect; end

    private

    def update_selectables
      @readables.clear
      @writables.clear

      @poll_items.each do |poll_item|
        @readables << poll_item.pollable if poll_item.readable?
        @writables << poll_item.pollable if poll_item.writable?
      end
    end

    # Convert the timeout value to something usable by
    # the library.
    #
    # -1 or :blocking should be converted to -1.
    #
    # Users will pass in values measured as
    # milliseconds, so we need to convert that value to
    # microseconds for the library.
    #
    def adjust timeout
      if :blocking == timeout || -1 == timeout
        -1
      else
        timeout.to_i
      end
    end

  end
end