liquidm/zmachine

View on GitHub
lib/zmachine/zmq_channel.rb

Summary

Maintainability
A
0 mins
Test Coverage
require 'zmachine'
require 'zmachine/channel'

module ZMachine
  class ZMQChannel < Channel

    extend Forwardable

    def_delegator :@socket, :identity
    def_delegator :@socket, :identity=

    def initialize
      super
      @raw = true
    end

    def selectable_fd
      @socket.fd
    end

    def bind(address, type)
      ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
      @bound = true
      @connected = true
      @socket = ZContext.create_socket(type)
      @socket.bind(address)
    end

    def bound?
      @bound
    end

    def accept
      ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
      self
    end

    def connect(address, type)
      ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
      @connection_pending = true
      @socket = ZContext.create_socket(type)
      @socket.connect(address)
    end

    def connection_pending?
      @connection_pending
    end

    def finish_connecting
      ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
      return unless connection_pending?
      @connected = true
    end

    def connected?
      @connected
    end

    def read_inbound_data
      ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
      return nil unless can_recv?
      data = ZMsg.recv_msg(@socket)
      data = String.from_java_bytes(data.first.data) unless @raw
      data
    end

    def close!
      ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
      @closed = true
      @connected = false
      @bound = false
      ZContext.destroy_socket(@socket)
    end

    def closed?
      @closed
    end

    def peer
      raise RuntimeError.new("ZMQChannel has no peer")
    end

    # see comment in ConnectionManager#process
    def can_recv?
      @socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN
    end

  end
end