lib/zmachine/zmq_channel.rb
require 'zmachine'
require 'zmachine/channel'
module ZMachine
class ZMQChannel < Channel
extend Forwardable
def_delegator :@socket, :identity
def_delegator :@socket, :identity=
def initialize
super
@raw = true
end
def selectable_fd
@socket.fd
end
def bind(address, type)
ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
@bound = true
@connected = true
@socket = ZContext.create_socket(type)
@socket.bind(address)
end
def bound?
@bound
end
def accept
ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
self
end
def connect(address, type)
ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
@connection_pending = true
@socket = ZContext.create_socket(type)
@socket.connect(address)
end
def connection_pending?
@connection_pending
end
def finish_connecting
ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
return unless connection_pending?
@connected = true
end
def connected?
@connected
end
def read_inbound_data
ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
return nil unless can_recv?
data = ZMsg.recv_msg(@socket)
data = String.from_java_bytes(data.first.data) unless @raw
data
end
def close!
ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
@closed = true
@connected = false
@bound = false
ZContext.destroy_socket(@socket)
end
def closed?
@closed
end
def peer
raise RuntimeError.new("ZMQChannel has no peer")
end
# see comment in ConnectionManager#process
def can_recv?
@socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN
end
end
end