lib/roma/tools/recoverlost_lib.rb

Summary

Maintainability
D
1 day
Test Coverage
#!/usr/bin/env ruby
#
# usage:recoverlost address port storage-path [yyyymmddhhmmss]
#
require 'roma/client/sender'
require 'roma/messaging/con_pool'
require 'roma/routing/routing_data'

module Roma
  module Storage
  end
  Storage::autoload(:TCStorage,'roma/storage/tc_storage')
  Storage::autoload(:DbmStorage,'roma/storage/dbm_storage')
  Storage::autoload(:SQLite3Storage,'roma/storage/sqlite3_storage')

  class RecoverLost

    def initialize(pname, pushv_cmd, argv, alldata = false)
      if alldata == false && argv.length < 4
        puts "usage:#{pname} address port storage-path [yyyymmddhhmmss]"
        exit
      end

      if alldata && argv.length != 3
        puts "usage:#{pname} address port storage-path"
        exit
      end

      @addr = argv[0]
      @port = argv[1]
      @strgpath = argv[2]
      @ymdhms = argv[3]

      if @port =~ /\D/
        STDERR.puts "port was not numeric."
        exit
      end

      if @ymdhms && (@ymdhms.length != 14 || @ymdhms =~ /\D/)
        STDERR.puts "yyyymmddhhmmss format mismatch."
        exit
      end
      @pushv_cmd = pushv_cmd
      @nodeid = "#{@addr}_#{@port}"
      @stream_copy_wait_param = 0.0001
      @alldata = alldata
    end

    def suite
      @rd = get_routing_data(@nodeid)
      unless @alldata
        @lost_vnodes = get_lost_vnodes(@rd,@ymdhms)
        puts "#{@lost_vnodes.length} vnodes where data was lost."

        exit if @lost_vnodes.length == 0
      else
        @lost_vnodes = @rd.v_idx.keys
      end

      each_hash(@strgpath){|hname,dir|
        puts "#{hname} #{dir}"
        @storage = open_storage(dir,@lost_vnodes)
        start_recover(hname)
        @storage.closedb
      }
    end

    def suite_with_keys(keys)
      @rd = get_routing_data(@nodeid)
      @lost_vnodes = @rd.v_idx.keys

      each_hash(@strgpath){|hname,dir|
        puts "#{hname} #{dir}"
        @storage = open_storage(dir,@lost_vnodes)
        start_recover_width_keys(hname, keys)
#        start_recover_width_keys2(hname, keys)
        @storage.closedb
      }
    end

    def each_hash(path)
      Dir::glob("#{path}/*").each{|dir|
        next unless File::directory?(dir)
        hname = dir[dir.rindex('/')+1..-1]
        yield hname,dir
      }
    end

    def get_routing_data(nid)
      sender = Roma::Client::Sender.new
      sender.send_routedump_command(nid)
    end

    def get_lost_vnodes(rd,ymdhms)
      ret = rd.get_lost_vnodes
      if ymdhms
        ret |= get_history_of_lost(@nodeid,ymdhms)
      end
      ret
    end

    def get_history_of_lost(nid,ymdhms)
      ret = []
      con = Roma::Messaging::ConPool.instance.get_connection(nid)
      con.write("history_of_lost #{ymdhms}\r\n")
      while((buf = con.gets) != "END\r\n")
        ret << buf.chomp.to_i
      end
      Roma::Messaging::ConPool.instance.return_connection(nid, con)
      ret
    end

    def open_storage(path,vn_list)
      unless File::directory?(path)
        STDERR.puts "#{path} does not found."
        return nil
      end

      # get a file extension
      ext = File::extname(Dir::glob("#{path}/0.*")[0])[1..-1]
      # count a number of divided files
      divnum = Dir::glob("#{path}/*.#{ext}").length

      st = new_storage(ext)
      st.divnum = divnum
      st.vn_list = vn_list
      st.storage_path = path
      st.opendb
      st
    end

    def new_storage(ext)
      case(ext)
      when 'tc'
        return ::Roma::Storage::TCStorage.new
      when 'dbm'
        return Roma::Storage::DbmStorage.new
      when 'sql3'
        return Roma::Storage::SQLite3Storage.new
      else
        return nil
      end
    end

    def start_recover(hname)
      @lost_vnodes.each_with_index{|vn, idx|
        nodes = @rd.v_idx[vn]
        if nodes == nil || nodes.length == 0
          nids = []
          nids[0] = @rd.nodes[rand(@rd.nodes.length)]
          puts "#{idx}/#{@lost_vnodes.length} #{vn} assign to #{nids.inspect}"
        else
          nids = nodes
          puts "#{idx}/#{@lost_vnodes.length} #{vn} was auto assigned at #{nids.inspect}"
        end

        nids.each{|nid|
          if push_a_vnode_stream(hname, vn, nid)!="STORED"
            STDERR.puts "push_a_vnode_stream aborted in #{vn}"
            exit
          end
        }

        if nodes == nil || nodes.length == 0
          cmd = "setroute #{vn} #{@rd.v_clk[vn]} #{nids[0]}\r\n"
          exit unless send_cmd(nids[0] ,cmd)
          broadcast_cmd(cmd, nids[0])
        end
      }
    end

    def push_a_vnode_stream(hname, vn, nid)
      con = Roma::Messaging::ConPool.instance.get_connection(nid)

      con.write("#{@pushv_cmd} #{hname} #{vn}\r\n")

      res = con.gets # READY\r\n or error string
      if res != "READY\r\n"
        con.close
        return res.chomp
      end

      @storage.each_vn_dump(vn){|data|
        con.write(clk_to_zero(data))
        sleep @stream_copy_wait_param
      }
      con.write("\0"*20) # end of steram

      res = con.gets # STORED\r\n or error string
      Roma::Messaging::ConPool.instance.return_connection(nid,con)
      res.chomp! if res
      res
    rescue =>e
      STDERR.puts "#{e}\n#{$@}"
      nil
    end

    def make_node_hash(keys)
      res = {}
      @rd.nodes.each{|nid| res[nid] = [] }
      keys.each{|key|
        d = Digest::SHA1.hexdigest(key).hex % (2**@rd.dgst_bits)
        @rd.v_idx[d & @rd.search_mask].each{|nid| res[nid] << key }
      }
      res
    end

    def start_recover_width_keys2(hname,keys)
      node_hash = make_node_hash(keys)
      node_hash.each{|nid,ks|
        puts nid
        upload_data2(hname, nid, ks)
      }
    end

    def upload_data2(hname, nid, keys)
      con = Roma::Messaging::ConPool.instance.get_connection(nid)

      cmd = "#{@pushv_cmd} #{hname} 0\r\n"
      con.write(cmd)
      res = con.gets # READY\r\n or error string
      if res != "READY\r\n"
        con.close
        return res.chomp
      end

      n = keys.length
      m = n / 100
      m = 1 if m < 1
      keys.each_with_index{|k,i|
        print "#{i}/#{n}\r" if i%m == 0
        data = @storage.get_raw2(k)
        next unless data
        d = Digest::SHA1.hexdigest(k).hex % (2**@rd.dgst_bits)
        vn = d & @rd.search_mask

        vn_old, last, clk, expt, val = data
        # puts "old vn = #{vn_old}"
        if val
          wd = [vn, last, 0, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}")
        else
          wd = [vn, last, 0, expt, k.length, k, 0].pack("NNNNNa#{k.length}N")
        end

        con.write(wd)
        sleep @stream_copy_wait_param
      }
      con.write("\0"*20) # end of steram

      res = con.gets # STORED\r\n or error string
      Roma::Messaging::ConPool.instance.return_connection(nid,con)
      res.chomp! if res
      res
    rescue =>e
      STDERR.puts "#{e}\n#{$@}"
      nil
    end

    def start_recover_width_keys(hname,keys)
      keys.each{|key|
        data = @storage.get_raw2(key)
        if data
          puts "hit => #{key}"
          d = Digest::SHA1.hexdigest(key).hex % (2**@rd.dgst_bits)
          vn = d & @rd.search_mask
          nodes = @rd.v_idx[vn]
          nodes.each{|nid|
            print "#{nid}=>"
            res = upload_data(hname, vn, nid, key, data)
            puts res
          }
        end
      }
    end

    def upload_data(hname, vn, nid, k, data)
      con = Roma::Messaging::ConPool.instance.get_connection(nid)

      cmd = "#{@pushv_cmd} #{hname} #{vn}\r\n"
      con.write(cmd)
# puts "new vn = #{vn}"
      res = con.gets # READY\r\n or error string
      if res != "READY\r\n"
        con.close
        return res.chomp
      end

      vn_old, last, clk, expt, val = data
# puts "old vn = #{vn_old}"
      if val
        wd = [vn, last, 0, expt, k.length, k, val.length, val].pack("NNNNNa#{k.length}Na#{val.length}")
      else
        wd = [vn, last, 0, expt, k.length, k, 0].pack("NNNNNa#{k.length}N")
      end

      con.write(wd)
      sleep @stream_copy_wait_param

      con.write("\0"*20) # end of steram

      res = con.gets # STORED\r\n or error string
      Roma::Messaging::ConPool.instance.return_connection(nid,con)
      res.chomp! if res
      res
    rescue =>e
      STDERR.puts "#{e}\n#{$@}"
      nil
    end

    def clk_to_zero(data)
      vn, last, clk, expt, klen = data.unpack('NNNNN')
      k, vlen = data[20..-1].unpack("a#{klen}N")
      if vlen != 0
        v, = data[(20+klen+4)..-1].unpack("a#{vlen}")
        [vn, last, 0, expt, klen, k, vlen, v].pack("NNNNNa#{klen}Na#{vlen}")
      else
        [vn, last, 0, expt, klen, k, 0].pack("NNNNNa#{klen}N")
      end
    end

    def broadcast_cmd(cmd,without_nids=nil)
      without_nids=[] unless without_nids
      res = {}
      @rd.nodes.each{ |nid|
        res[nid] = send_cmd(nid,cmd) unless without_nids.include?(nid)
      }
      res
    rescue => e
      STDERR.puts("#{e}\n#{$@}")
      nil
    end

    def send_cmd(nid, cmd)
      con = Roma::Messaging::ConPool.instance.get_connection(nid)
      con.write(cmd)
      res = con.gets
      Roma::Messaging::ConPool.instance.return_connection(nid, con)
      if res
        res.chomp!
      end
      res
    rescue => e
      STDERR.puts("#{__FILE__}:#{__LINE__}:Send command failed that node-id is #{nid},command is #{cmd}.")
      nil
    end

  end # class RecoverLost
end # module Roma