colstrom/ezmq

View on GitHub
lib/ezmq/subscribe.rb

Summary

Maintainability
A
0 mins
Test Coverage
require_relative 'socket'

# Syntactic sugar for 0MQ, because Ruby shouldn't feel like C.
module EZMQ
  # Subscribe socket that listens for messages with an optional topic.
  class Subscriber < EZMQ::Socket
    # Creates a new Subscriber socket.
    #
    # @note The default behaviour is to output and messages received to STDOUT.
    #
    # @param [:bind, :connect] mode (:connect) a mode for the socket.
    # @param [Hash] options optional parameters.
    # @option options [String] topic a topic to subscribe to.
    # @see EZMQ::Socket EZMQ::Socket for optional parameters.
    #
    # @return [Publisher] a new instance of Publisher.
    #
    def initialize(mode = :connect, **options)
      super mode, ZMQ::SUB, options
      subscribe options[:topic] if options[:topic]
    end

    # Receive a message from the socket.
    #
    # @note This method blocks until a message arrives.
    #
    # @param [Hash] options optional parameters.
    # @option options [lambda] decode how to decode the message.
    #
    # @yield [message, topic] passes the message body and topic to the block.
    # @yieldparam [Object] message the message received (decoded).
    # @yieldparam [String] topic the topic of the message.
    #
    # @return [Object] the message received (decoded).
    #
    def receive(**options)
      message = ''
      @socket.recv_string message

      message = message.match(/^(?<topic>[^\ ]*)\ (?<body>.*)/m)

      decoded = (options[:decode] || @decode).call message['body']
      if block_given?
        yield decoded, message['topic']
      else
        [decoded, message['topic']]
      end
    end

    # Like receive, but doesn't stop at one message.
    #
    # @yield [message, topic] passes the message body and topic to the block.
    # @yieldparam [String] message the message received.
    # @yieldparam [String] topic the topic of the message.
    #
    # @return [void]
    #
    def listen(&block)
      loop do
        block.call(*receive)
      end
    end

    # Establishes a new message filter on the socket.
    #
    # @note By default, a Subscriber filters all incoming messages. Without
    # calling subscribe at least once, no messages will be accepted. If topic
    # was provided, #initialize calls #subscribe automatically.
    #
    # @param [String] topic a topic to subscribe to. Messages matching this
    # prefix will be accepted.
    #
    # @return [Boolean] was subscription successful?
    #
    def subscribe(topic)
      @socket.setsockopt(ZMQ::SUBSCRIBE, topic) == 0
    end

    # Removes a message filter (as set with subscribe) from the socket.
    #
    # @param [String] topic the topic to unsubscribe from. If multiple filters
    #   with the same topic are set, this will only remove one.
    #
    # @return [Boolean] was unsubscription successful?
    #
    def unsubscribe(topic)
      @socket.setsockopt(ZMQ::UNSUBSCRIBE, topic) == 0
    end
  end
end