lib/roma/tools/tribunus.rb

Summary

Maintainability
B
4 hrs
Test Coverage
#!/usr/bin/ruby
require 'socket'
require 'ipaddr'
require 'optparse'
UDP_PORT=14329
MULTICAST_ADDR="225.0.0.123"
ROMA_LOAD_PATH=File.expand_path(File.join(File.dirname(__FILE__),"../.."))
RUBY_COMMAND_OPTIONS=["-I",ROMA_LOAD_PATH]
ROMAD_OPTIONS=["--replication_in_host"]
bin_dir=File.expand_path(File.join(File.dirname(__FILE__),"../../../bin"))
ROMAD_PATH= File.expand_path(File.join(bin_dir,"romad"))
MKROUTE_PATH= File.expand_path(File.join(bin_dir,"mkroute"))

ROMAD_WORK_DIR='.'
class Tribunus
  #protocol
  #
  #update:<reply_ocunt>: <romad_hostname> [<romad_port> ...]

  RomadHost=Struct.new(:hostname,:ports,:updated_at)
  def log(obj)
    if @verbose
      if obj.is_a? String
        $stderr.puts obj
      else
        $stderr.puts obj.inspect
      end
    end
  end
  private :log

  def initialize(romad_hostname,romad_ports,options={})
    @multi_addr=options[:multicast_addr]||MULTICAST_ADDR
    @port=options[:udp_port]||UDP_PORT
    @romad_work_dir=options[:romad_work_dir]||ROMAD_WORK_DIR
    @ruby_command_name=options[:ruby_command_name]||"ruby"
    @verbose=options[:verbose]
    log [:initalized,@multi_addr,@port,@romad_work_dir,@ruby_command_name]

    @threads=[]
    @romads={} #port => pid
    @local_romad_host=RomadHost.new(romad_hostname,romad_ports,nil)

    @mutex=Mutex.new
    @remote_servers={} #ipaddr => RomadHost

  end

  def from_remote?(ipaddr)
      from_remote= !Socket.ip_address_list.any?{|addr|addr.ip_address==ipaddr}
  end
  private :from_remote?


  def spawn_new_roma_ring
    spawn_romads(nil,nil)
  end


  def spawn_romads(remote_host,remote_port)
    nodes=@local_romad_host.ports.map do|port|
      "#{@local_romad_host.hostname}_#{port}"
    end
    nodes << "#{remote_host}_#{remote_port}" if remote_host
    pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,MKROUTE_PATH,*nodes,:chdir=>@romad_work_dir)
    if(Process.waitpid2(pid)[1].to_i!=0)
      raise "failed to make route"
    end

    @local_romad_host.ports.each do|port|
      pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,ROMAD_PATH,*ROMAD_OPTIONS,"-p",port.to_s,@local_romad_host.hostname, :chdir=>@romad_work_dir)
      @romads[port]=pid
    end
  end

  def spawn_romads_join(remote_host,remote_port)
    @local_romad_host.ports.map do|port|
      spawn_romad_join(port,remote_host,remote_port)
    end
  end


  def spawn_romad_join(port,remote_host,remote_port)
    pid=Process.spawn(@ruby_command_name,*RUBY_COMMAND_OPTIONS,ROMAD_PATH,*ROMAD_OPTIONS,"-p",port.to_s,@local_romad_host.hostname,"-j","#{remote_host}_#{remote_port}",:chdir=>@romad_work_dir)
    @romads[port]=pid
  end


  def receive_update_command(ipaddr,reply_count,params)
    param_ary=params.strip.split(/\s+/)
    unless param_ary.empty?
      hostname=param_ary[0]
      ports=param_ary[1..-1].map{|port| port.to_i}
      rhost=RomadHost.new(hostname,ports,Time.now)
      @remote_servers[ipaddr]=rhost
      if reply_count>0
        unicast(ipaddr,update_message(reply_count-1))
      end
    end
  end

  def run_command(ipaddr,msg)
    match_data=/\A(.*):(\d+):(.*)/.match(msg)
    if(match_data)
      command=match_data[1]
      reply_count=match_data[2].to_i
      rest=match_data[3]
      case command
      when "update"
        receive_update_command(ipaddr,reply_count,rest)
      end
    end
  end
  private :run_command

  def update_message(reply_count,initial=false)
    msg="update:#{reply_count}: #{@local_romad_host.hostname}"
    if !initial && !@romads.keys.empty?
      msg+=" "
      msg+= @romads.keys.join(' ')
    end
    log [:msg, msg,@romads]
    msg
  end

  def server_loop
    sock=UDPSocket.new
    sock.bind('0.0.0.0',@port)
    sock.setsockopt(Socket::IPPROTO_IP, Socket::IP_ADD_MEMBERSHIP, IPAddr.new(MULTICAST_ADDR).hton+IPAddr.new('0.0.0.0').hton)
    log 'start_listen'
    Socket.udp_server_loop_on([sock]) do|msg,msg_src|
        log [:received ,msg,msg_src]
      if from_remote?(msg_src.remote_address.ip_address)
        run_command(msg_src.remote_address.ip_address,msg)
      end
    end
  end

  HEARTBEAT_SECONDS=300
  HEARTBEAT_LOOP_INTERVAL=50
  TIMEOUT_SECONDS=600


  def heartbeat_loop
    loop do
      delete_ipaddrs=[]
      @remote_servers.each do|ipaddr,host|

        if host.updated_at+TIMEOUT_SECONDS < Time.now
          delete_ipaddrs << ipaddr
        elsif host.updated_at+HEARTBEAT_SECONDS < Time.now
          unicast(ipaddr,update_message(0))
        elsif host.ports.empty?
          unicast(ipaddr,update_message(1))
        end
      end

      @mutex.synchronize do
        delete_ipaddrs.each do|ipaddr|
          @remote_servers.delete(ipaddr)
        end
      end




      sleep(HEARTBEAT_LOOP_INTERVAL)
    end
  rescue =>e
    p e
  end

  def set_trap
    [:INT,:TERM,:HUP].each do|sig|
      Signal.trap(sig){ Process.kill(sig,*@romads.values);exit(1)  }
    end
  end

  def prepare_to_start
    set_trap
    @threads << Thread.start{self.server_loop}
    @threads << Thread.start{self.heartbeat_loop}
  end
  private :prepare_to_start

  def start_new_ring
    prepare_to_start
    spawn_new_roma_ring
  end

  def choose_rhost
    @remote_servers.each do|ipaddr,rhost|
      unless rhost.ports.empty?
        return rhost
      end
    end
    nil
  end
  def start_join(host,port)
    prepare_to_start
    sleep(0.2)
    spawn_romads_join(host,port)
  end

  def start_discover
    prepare_to_start
    sleep(0.2)
    multicast(update_message(1,true))
    10.times{sleep(0.3)}
    rhost=choose_rhost
    if rhost
      spawn_romads_join(rhost.hostname,rhost.ports.first)
    else
      $stderr.puts "no server responded"
      exit 1
    end

  end

  def join
    @threads.each{|t|t.join}
  end

  def unicast(ipaddr,msg)
    log [:message, ipaddr,msg]
    s=UDPSocket.new
    begin
      s.connect(ipaddr,@port)
      s.sendmsg(msg)
    ensure
        s.close
    end
  end

  def multicast(msg)
    unicast(@multi_addr,msg)
  end
end

opt=OptionParser.new
conf={}
opt.on('-d','discover the node by multicast [default]') do|v|
  conf[:mode]=:discover
end
opt.on('-c','craete new ring') do|v|
  conf[:mode]=:new_ring
end
opt.on('-j HOST:PORT',/\A.+?[_:]\d+\Z/,'join the specified romad node') do|v|
  conf[:mode]=:join
  node=v.split(/[_:]/)
  conf[:joining_node]=[node[0],node[1].to_i]
end

opt.on('-p UDP_PORT',/\A\d+\Z/,'the port for multicast') do|v|
  conf[:udp_port]=v.to_i
end
opt.on('-w WORKING_DIR','the directory where romads run') do|v|
  conf[:romad_work_dir]=v
end
opt.on('-m MULTICAST_ADDRESS',/\A\d+\.\d+\.\d+\.\d+\Z/,'the ip address for multicast') do|v|
  conf[:multicast_addr]=v
end
opt.on('-r RUBY_COMMAND','name of ruby interpreter (default: "ruby")') do|v|
  conf[:ruby_command_name]=v
end
opt.on('-v','--verbose') do|v|
  conf[:verbose]=v
end
opt.banner += " hostname port_range"

opt.parse!(ARGV)

if ARGV.size!=2
  puts opt.help
  exit 1
end
hostname=ARGV[0]
port_ary=ARGV[1].split('-')
ports=(port_ary[0].to_i..port_ary[1].to_i).to_a
if ports.size < 2
  puts 'less ports'
  exit 1
end
if ports.size >100
  puts 'too many ports'
  exit 1
end

tri=Tribunus.new(hostname,ports,conf)
case conf[:mode]
when:new_ring
  tri.start_new_ring
when :join
  tri.start_join(conf[:joining_node][0],conf[:joining_node][1])
else
  tri.start_discover
end
tri.join