chuckremes/ffi-rzmq

View on GitHub
examples/latency_measurement.rb

Summary

Maintainability
A
2 hrs
Test Coverage

require File.join(File.dirname(__FILE__), '..', 'lib', 'ffi-rzmq')

# Within a single process, we start up two threads. One thread has a REQ (request)
# socket and the second thread has a REP (reply) socket. We measure the
# *round-trip* latency between these sockets. Only *one* message is in flight at
# any given moment.
#
# This example also illustrates how a single context can be shared amongst several
# threads. Sharing a single context also allows a user to specify the "inproc"
# transport in addition to "tcp" and "ipc".
#
#  % ruby latency_measurement.rb tcp://127.0.0.1:5555 1024 1_000_000
#
#  % ruby latency_measurement.rb inproc://lm_sock 1024 1_000_000
#

if ARGV.length < 3
  puts "usage: ruby latency_measurement.rb <connect-to> <message-size> <roundtrip-count>"
  exit
end

link = ARGV[0]
message_size = ARGV[1].to_i
roundtrip_count = ARGV[2].to_i

def assert(rc)
  raise "Last API call failed at #{caller(1)}" unless rc >= 0
end

begin
  master_context = ZMQ::Context.new
rescue ContextError => e
  STDERR.puts "Failed to allocate context or socket!"
  raise
end


class Receiver
  def initialize context, link, size, count
    @context = context
    @link = link
    @size = size
    @count = count

    begin
      @socket = @context.socket(ZMQ::REP)
    rescue ContextError => e
      STDERR.puts "Failed to allocate REP socket!"
      raise
    end

    assert(@socket.setsockopt(ZMQ::LINGER, 100))
    assert(@socket.setsockopt(ZMQ::RCVHWM, 100))
    assert(@socket.setsockopt(ZMQ::SNDHWM, 100))

    assert(@socket.bind(@link))
  end

  def run
    @count.times do
      string = ''
      assert(@socket.recv_string(string, 0))

      raise "Message size doesn't match, expected [#{@size}] but received [#{string.size}]" if @size != string.size

      assert(@socket.send_string(string, 0))
    end

    assert(@socket.close)
  end
end

class Transmitter
  def initialize context, link, size, count
    @context = context
    @link = link
    @size = size
    @count = count

    begin
      @socket = @context.socket(ZMQ::REQ)
    rescue ContextError => e
      STDERR.puts "Failed to allocate REP socket!"
      raise
    end

    assert(@socket.setsockopt(ZMQ::LINGER, 100))
    assert(@socket.setsockopt(ZMQ::RCVHWM, 100))
    assert(@socket.setsockopt(ZMQ::SNDHWM, 100))

    assert(@socket.connect(@link))
  end

  def run
    msg = "#{ '3' * @size }"

    elapsed = elapsed_microseconds do
      @count.times do
        assert(@socket.send_string(msg, 0))
        assert(@socket.recv_string(msg, 0))

        raise "Message size doesn't match, expected [#{@size}] but received [#{msg.size}]" if @size != msg.size
      end
    end

    latency = elapsed / @count / 2

    puts "message size: %i [B]" % @size
    puts "roundtrip count: %i" % @count
    puts "throughput (msgs/s): %i" % (@count / (elapsed / 1_000_000))
    puts "mean latency: %.3f [us]" % latency
    assert(@socket.close)
  end

  def elapsed_microseconds(&blk)
    start = Time.now
    yield
    value = ((Time.now - start) * 1_000_000)
  end
end

threads = []
threads << Thread.new do
  receiver = Receiver.new(master_context, link, message_size, roundtrip_count)
  receiver.run
end

sleep 1

threads << Thread.new do
  transmitter = Transmitter.new(master_context, link, message_size, roundtrip_count)
  transmitter.run
end

threads.each {|t| t.join}

master_context.terminate