lib/roma/async_process.rb
require 'thread'
require 'digest/sha1'
require 'timeout'
module Roma
class AsyncMessage
attr_accessor :event
attr_accessor :args
attr_accessor :callback
def initialize(ev, ag = nil, &cb)
@event = ev
@args = ag
@callback = cb
@retry_count = 0
@retry_max = 10
@retry_wait = 0.1
end
def retry?
@retry_max > @retry_count
end
def incr_count
@retry_count += 1
end
def wait
sleep(@retry_wait)
end
end
module AsyncProcess
@@async_queue = Queue.new
@@async_queue_latency = Queue.new
def self.queue
@@async_queue
end
def self.queue_latency
@@async_queue_latency
end
def start_async_process
@async_thread = Thread.new do
async_process_loop
end
@async_thread[:name] = __method__
@async_thread_latency = Thread.new do
async_process_loop_for_latency
end
@async_thread_latency[:name] = __method__
rescue => e
@log.error("#{e}\n#{$ERROR_POSITION}")
end
private
def stop_async_process
count = 0
while @@async_queue.empty? == false && count < 100
count += 1
sleep 0.1
end
@async_thread.exit
count = 0
while @@async_queue_latency.empty? == false && count < 100
count += 1
sleep 0.1
end
@async_thread_latency.exit
end
def async_process_loop
loop do
while msg = @@async_queue.pop
if send("asyncev_#{msg.event}", msg.args)
msg.callback.call(msg, true) if msg.callback
else
if msg.retry?
t = Thread.new do
msg.wait
msg.incr_count
@@async_queue.push(msg)
end
t[:name] = __method__
else
@log.error("async process retry out:#{msg.inspect}")
msg.callback.call(msg, false) if msg.callback
end
end
end
end
rescue => e
@log.error("#{e}\n#{$ERROR_POSITION}")
retry
end
def async_process_loop_for_latency
loop do
while msg = @@async_queue_latency.pop
if send("asyncev_#{msg.event}", msg.args)
msg.callback.call(msg, true) if msg.callback
else
if msg.retry?
t = Thread.new do
msg.wait
msg.incr_count
@@async_queue_latency.push(msg)
end
t[:name] = __method__
else
@log.error("async process retry out:#{msg.inspect}")
msg.callback.call(msg, false) if msg.callback
end
end
end
end
rescue => e
@log.error("#{e}\n#{$ERROR_POSITION}")
retry
end
def asyncev_broadcast_cmd(args)
@log.debug("#{__method__} #{args.inspect}")
cmd, nids, tout = args
t = Thread.new do
async_broadcast_cmd("#{cmd}\r\n", nids, tout)
end
t[:name] = __method__
true
end
def asyncev_start_join_process(_args)
@log.debug(__method__)
if @stats.run_join
@log.error("#{__method__}:join process running")
return true
end
if @stats.run_recover
@log.error("#{__method__}:recover process running")
return true
end
if @stats.run_balance
@log.error("#{__method__}:balance process running")
return true
end
@stats.run_join = true
t = Thread.new do
begin
join_process
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
@stats.run_join = false
@stats.join_ap = nil
end
end
t[:name] = __method__
true
end
def asyncev_start_balance_process(_args)
@log.debug(__method__)
if @stats.run_join
@log.error("#{__method__}:join process running")
return true
end
if @stats.run_recover
@log.error("#{__method__}:recover process running")
return true
end
if @stats.run_balance
@log.error("#{__method__}:balance process running")
return true
end
@stats.run_balance = true
t = Thread.new do
begin
balance_process
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
@stats.run_balance = false
end
end
t[:name] = __method__
true
end
def asyncev_redundant(args)
nid, hname, k, d, clk, expt, v = args
@log.debug("#{__method__} #{args.inspect}")
unless @rttable.nodes.include?(nid)
@log.warn("async redundant failed:#{nid} does not found in routing table.#{k}\e#{hname} #{d} #{clk} #{expt} #{v.length}")
return true # no retry
end
res = async_send_cmd(nid, "rset #{k}\e#{hname} #{d} #{clk} #{expt} #{v.length}\r\n#{v}\r\n", 10)
if res.nil? || res.start_with?('ERROR')
@log.warn("async redundant failed:#{k}\e#{hname} #{d} #{clk} #{expt} #{v.length} -> #{nid}")
return false # retry
end
true
end
def asyncev_zredundant(args)
nid, hname, k, d, clk, expt, zv = args
@log.debug("#{__method__} #{args.inspect}")
unless @rttable.nodes.include?(nid)
@log.warn("async zredundant failed:#{nid} does not found in routing table.#{k}\e#{hname} #{d} #{clk} #{expt} #{zv.length}")
return true # no retry
end
res = async_send_cmd(nid, "rzset #{k}\e#{hname} #{d} #{clk} #{expt} #{zv.length}\r\n#{zv}\r\n", 10)
if res.nil? || res.start_with?('ERROR')
@log.warn("async zredundant failed:#{k}\e#{hname} #{d} #{clk} #{expt} #{v.length} -> #{nid}")
return false # retry
end
true
end
def asyncev_rdelete(args)
nid, hname, k, clk = args
@log.debug("#{__method__} #{args.inspect}")
unless @rttable.nodes.include?(nid)
@log.warn("async rdelete failed:#{nid} does not found in routing table.#{k}\e#{hname} #{clk}")
return true # no retry
end
res = async_send_cmd(nid, "rdelete #{k}\e#{hname} #{clk}\r\n", 10)
unless res
@log.warn("async redundant failed:#{k}\e#{hname} #{clk} -> #{nid}")
return false # retry
end
true
end
def asyncev_reqpushv(args)
vn, nid, p = args
@log.debug("#{__method__} #{args.inspect}")
if @stats.run_iterate_storage
@log.warn("#{__method__}:already be iterated storage process.")
else
@stats.run_iterate_storage = true
t = Thread.new do
begin
sync_a_vnode(vn.to_i, nid, p == 'true')
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
@stats.run_iterate_storage = false
end
end
t[:name] = __method__
end
end
def asyncev_start_recover_process(args)
@log.debug("#{__method__} #{args.inspect}")
if @stats.run_join
@log.error("#{__method__}:join process running")
return true
end
if @stats.run_recover
@log.error("#{__method__}:recover process running.")
return false
end
if @stats.run_balance
@log.error("#{__method__}:balance process running")
return true
end
@stats.run_recover = true
t = Thread.new do
begin
acquired_recover_process
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
@stats.run_recover = false
end
end
t[:name] = __method__
end
def asyncev_start_auto_recover_process(args)
@log.debug("#{__method__} #{args.inspect}")
# ##run_join don't have possibility to be true in this case.
# if @stats.run_join
# @log.error("#{__method__}:join process running")
# return true
# end
if @stats.run_recover
@log.error("#{__method__}:recover process running.")
return false
end
if @stats.run_balance
@log.error("#{__method__}:balance process running")
return true
end
@rttable.auto_recover_status = 'preparing'
t = Thread.new do
begin
Timeout.timeout(@rttable.auto_recover_time)do
loop do
sleep 1
break if @rttable.auto_recover_status != 'preparing'
# break if @stats.run_join #run_join don't have possibility to be true in this case.
break if @stats.run_recover
break if @stats.run_balance
end
end
@log.debug('inactivated AUTO_RECOVER')
rescue
case @rttable.lost_action
when :auto_assign, :shutdown
@stats.run_recover = true
@rttable.auto_recover_status = 'executing'
begin
@log.debug('auto recover start')
acquired_recover_process
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
@stats.run_recover = false
@rttable.auto_recover_status = 'waiting'
end
when :no_action
@log.debug('auto recover NOT start. Because lost action is [no_action]')
end
end
end
t[:name] = __method__
end
def asyncev_start_release_process(args)
@log.debug("#{__method__} #{args}")
if @stats.run_iterate_storage
@log.warn("#{__method__}:already be iterated storage process.")
else
@stats.run_release = true
@stats.run_iterate_storage = true
@stats.spushv_protection = true
t = Thread.new do
begin
release_process
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
@stats.run_iterate_storage = false
@stats.run_release = false
end
end
t[:name] = __method__
end
end
def acquired_recover_process
@log.info("#{__method__}:start")
exclude_nodes = @rttable.exclude_nodes_for_recover(@stats.ap_str, @stats.rep_host)
@do_acquired_recover_process = true
loop do
break unless @do_acquired_recover_process
break if @rttable.num_of_vn(@stats.ap_str)[2] == 0 # short vnodes
vn, nodes, is_primary = @rttable.select_vn_for_recover(exclude_nodes, @stats.rep_host)
break unless vn
if nodes.length != 0
ret = req_push_a_vnode(vn, nodes[0], is_primary)
if ret == :rejected
sleep 1
elsif ret == false
break
end
sleep 1
end
end
@log.info("#{__method__} has done.")
rescue => e
@log.error("#{e.inspect} #{$ERROR_POSITION}")
ensure
@do_acquired_recover_process = false
end
def join_process
@log.info("#{__method__}:start")
count = 0
nv = @rttable.v_idx.length
exclude_nodes = @rttable.exclude_nodes_for_join(@stats.ap_str, @stats.rep_host)
@do_join_process = true
while @rttable.vnode_balance(@stats.ap_str) == :less && count < nv
break unless @do_join_process
vn, nodes, is_primary = @rttable.select_vn_for_join(exclude_nodes, @stats.rep_host)
unless vn
@log.warn("#{__method__}:vnode does not found")
return false
end
ret = req_push_a_vnode(vn, nodes[0], is_primary)
if ret == :rejected
sleep 5
else
sleep 1
count += 1
end
end
rescue => e
@log.error("#{e.inspect} #{$ERROR_POSITION}")
ensure
@log.info("#{__method__} has done.")
@do_join_process = false
end
def balance_process
@log.info("#{__method__}:start")
count = 0
nv = @rttable.v_idx.length
exclude_nodes = @rttable.exclude_nodes_for_balance(@stats.ap_str, @stats.rep_host)
@do_balance_process = true
while @rttable.vnode_balance(@stats.ap_str) == :less && count < nv
break unless @do_balance_process
vn, nodes, is_primary = @rttable.select_vn_for_balance(exclude_nodes, @stats.rep_host)
unless vn
@log.warn("#{__method__}:vnode does not found")
return false
end
ret = req_push_a_vnode(vn, nodes[0], is_primary)
if ret == :rejected
sleep 5
else
sleep 1
count += 1
end
end
@log.info("#{__method__} has done.")
rescue => e
@log.error("#{e.inspect} #{$ERROR_POSITION}")
ensure
@do_balance_process = false
end
def req_push_a_vnode(vn, src_nid, is_primary)
con = Roma::Messaging::ConPool.instance.get_connection(src_nid)
con.write("reqpushv #{vn} #{@stats.ap_str} #{is_primary}\r\n")
res = con.gets # receive 'PUSHED\r\n' | 'REJECTED\r\n' | 'ERROR\r\n'
if res == "REJECTED\r\n"
@log.warn("#{__method__}:request was rejected from #{src_nid}.")
Roma::Messaging::ConPool.instance.return_connection(src_nid, con)
return :rejected
elsif res != "PUSHED\r\n"
@log.warn("#{__method__}:#{res}")
return :rejected
end
Roma::Messaging::ConPool.instance.return_connection(src_nid, con)
# waiting for pushv
count = 0
while @rttable.search_nodes(vn).include?(@stats.ap_str) == false && count < @stats.reqpushv_timeout_count
sleep 0.1
count += 1
end
if count >= @stats.reqpushv_timeout_count
@log.warn("#{__method__}:request has been time-out.vn=#{vn} nid=#{src_nid}")
return :timeout
end
true
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
@rttable.proc_failed(src_nid)
false
end
def release_process
@log.info("#{__method__}:start.")
if @rttable.can_i_release?(@stats.ap_str, @stats.rep_host)
@log.error("#{__method__}:Sufficient nodes do not found.")
return
end
@do_release_process = true
while @rttable.has_node?(@stats.ap_str)
break unless @do_release_process
@rttable.each_vnode do |vn, nids|
break unless @do_release_process
if nids.include?(@stats.ap_str)
to_nid, new_nids = @rttable.select_node_for_release(@stats.ap_str, @stats.rep_host, nids)
res = sync_a_vnode_for_release(vn, to_nid, new_nids)
if res == :abort
@log.error("#{__method__}:release_process aborted due to SERVER_ERROR received.")
@do_release_process = false
end
if res == false
@log.warn("#{__method__}:error at vn=#{vn} to_nid=#{to_nid} new_nid=#{new_nids}")
redo
end
end
end
end
@log.info("#{__method__} has done.")
rescue => e
@log.error("#{e}\n#{$ERROR_POSITION}")
ensure
@do_release_process = false
Roma::Messaging::ConPool.instance.close_all
end
def sync_a_vnode_for_release(vn, to_nid, new_nids)
nids = @rttable.search_nodes(vn)
if nids.include?(to_nid) == false
@log.debug("#{__method__}:#{vn} #{to_nid}")
# change routing data at the vnode and synchronize a data
nids << to_nid
return false unless @rttable.transaction(vn, nids)
# synchronize a data
@storages.each_key do |hname|
res = push_a_vnode_stream(hname, vn, to_nid)
if res != 'STORED'
@rttable.rollback(vn)
@log.error("#{__method__}:push_a_vnode was failed:hname=#{hname} vn=#{vn}:#{res}")
return :abort if res.start_with?('SERVER_ERROR')
return false
end
end
if (clk = @rttable.commit(vn)) == false
@rttable.rollback(vn)
@log.error("#{__method__}:routing table commit failed")
return false
end
clk = @rttable.set_route(vn, clk, new_nids)
if clk.is_a?(Integer) == false
clk, new_nids = @rttable.search_nodes_with_clk(vn)
end
cmd = "setroute #{vn} #{clk - 1}"
new_nids.each { |nn| cmd << " #{nn}" }
res = async_broadcast_cmd("#{cmd}\r\n")
@log.debug("#{__method__}:async_broadcast_cmd(#{cmd}) #{res}")
end
return true
rescue => e
@log.error("#{e}\n#{$ERROR_POSITION}")
false
end
def sync_a_vnode(vn, to_nid, is_primary = nil)
nids = @rttable.search_nodes(vn)
if nids.include?(to_nid) == false || (is_primary && nids[0] != to_nid)
@log.debug("#{__method__}:#{vn} #{to_nid} #{is_primary}")
# change routing data at the vnode and synchronize a data
nids << to_nid
return false unless @rttable.transaction(vn, nids)
# synchronize a data
@storages.each_key do |hname|
res = push_a_vnode_stream(hname, vn, to_nid)
if res != 'STORED'
@rttable.rollback(vn)
@log.error("#{__method__}:push_a_vnode was failed:hname=#{hname} vn=#{vn}:#{res}")
return false
end
end
if (clk = @rttable.commit(vn)) == false
@rttable.rollback(vn)
@log.error("#{__method__}:routing table commit failed")
return false
end
nids = edit_nodes(nids, to_nid, is_primary)
clk = @rttable.set_route(vn, clk, nids)
if clk.is_a?(Integer) == false
clk, nids = @rttable.search_nodes_with_clk(vn)
end
cmd = "setroute #{vn} #{clk - 1}"
nids.each { |nn| cmd << " #{nn}" }
res = async_broadcast_cmd("#{cmd}\r\n")
@log.debug("#{__method__}:async_broadcast_cmd(#{cmd}) #{res}")
else
# synchronize a data
@storages.each_key do |hname|
res = push_a_vnode_stream(hname, vn, to_nid)
if res != 'STORED'
@log.error("#{__method__}:push_a_vnode was failed:hname=#{hname} vn=#{vn}:#{res}")
return false
end
end
end
return true
rescue => e
@log.error("#{e}\n#{$ERROR_POSITION}")
false
end
def edit_nodes(nodes, new_nid, is_primary)
if @rttable.rn == 1
return [new_nid]
end
# [node_a, node_b, new_nid]
nodes.delete(new_nid)
# [node_a, node_b]
if nodes.length >= @rttable.rn
host = new_nid.split(/[:_]/)[0]
buf = [] # list of a same host
nodes.each do |nid|
buf << nid if nid.split(/[:_]/)[0] == host
end
if buf.length > 0
# include same host
# delete a last one, due to save a primary node
nodes.delete(buf.last)
else
nodes.delete(nodes.last)
end
end
if is_primary
# [new_nid, node_a]
nodes.insert(0, new_nid)
else
# [node_a, new_nid]
nodes << new_nid
end
nodes
end
def push_a_vnode_stream(hname, vn, nid)
@log.debug("#{__method__}:hname=#{hname} vn=#{vn} nid=#{nid}")
stop_clean_up
con = Roma::Messaging::ConPool.instance.get_connection(nid)
@do_push_a_vnode_stream = true
con.write("spushv #{hname} #{vn}\r\n")
res = con.gets # READY\r\n or error string
if res != "READY\r\n"
con.close
return res.chomp
end
res_dump = @storages[hname].each_vn_dump(vn) do |data|
unless @do_push_a_vnode_stream
con.close
@log.error("#{__method__}:canceled in hname=#{hname} vn=#{vn} nid=#{nid}")
return 'CANCELED'
end
con.write(data)
sleep @stats.stream_copy_wait_param
end
con.write("\0" * 20) # end of stream
res = con.gets # STORED\r\n or error string
Roma::Messaging::ConPool.instance.return_connection(nid, con)
res.chomp! if res
if res_dump == false
@log.error("#{__method__}:each_vn_dump in hname=#{hname} vn=#{vn} nid=#{nid}")
return 'CANCELED'
end
res
rescue => e
@log.error("#{e}\n#{$ERROR_POSITION}")
e.to_s
end
def asyncev_start_storage_clean_up_process(_args)
# @log.info("#{__method__}")
if @stats.run_storage_clean_up
@log.error("#{__method__}:already in being")
return
end
@stats.run_storage_clean_up = true
t = Thread.new do
begin
storage_clean_up_process
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
@stats.last_clean_up = Time.now
@stats.run_storage_clean_up = false
end
end
t[:name] = __method__
end
def storage_clean_up_process
@log.info("#{__method__}:start")
me = @stats.ap_str
vnhash = {}
@rttable.each_vnode do |vn, nids|
if nids.include?(me)
if nids[0] == me
vnhash[vn] = :primary
else
vnhash[vn] = "secondary#{nids.index(me)}".to_sym
end
end
end
t = Time.now.to_i - Roma::Config::STORAGE_DELMARK_EXPTIME
count = 0
@storages.each_pair do |hname, st|
break unless @stats.do_clean_up?
st.each_clean_up(t, vnhash) do |key, vn|
# @log.debug("#{__method__}:key=#{key} vn=#{vn}")
if @stats.run_receive_a_vnode.key?("#{hname}_#{vn}")
false
else
nodes = @rttable.search_nodes_for_write(vn)
if nodes && nodes.length > 1
nodes[1..-1].each do |nid|
res = async_send_cmd(nid, "out #{key}\e#{hname} #{vn}\r\n")
unless res
@log.warn("send out command failed:#{key}\e#{hname} #{vn} -> #{nid}")
end
# @log.debug("#{__method__}:res=#{res}")
end
end
count += 1
@stats.out_count += 1
true
end
end
end
if count > 0
@log.info("#{__method__}:#{count} keys deleted.")
end
# delete @rttable.logs
if @stats.gui_run_gather_logs || @rttable.logs.empty?
false
else
gathered_time = @rttable.logs[0]
# delete gathering log data after 5min
@rttable.logs.clear if gathered_time.to_i < Time.now.to_i - (60 * 5)
end
ensure
@log.info("#{__method__}:stop")
end
def asyncev_calc_latency_average(args)
latency, cmd = args
# @log.debug(__method__)
unless @stats.latency_data.key?(cmd) # only first execute target cmd
@stats.latency_data[cmd].store('latency', [])
@stats.latency_data[cmd].store('latency_max', {})
@stats.latency_data[cmd]['latency_max'].store('current', 0)
@stats.latency_data[cmd].store('latency_min', {})
@stats.latency_data[cmd]['latency_min'].store('current', 99_999)
@stats.latency_data[cmd].store('time', Time.now.to_i)
end
begin
@stats.latency_data[cmd]['latency'].delete_at(0) if @stats.latency_data[cmd]['latency'].length >= 10
@stats.latency_data[cmd]['latency'].push(latency)
@stats.latency_data[cmd]['latency_max']['current'] = latency if latency > @stats.latency_data[cmd]['latency_max']['current']
@stats.latency_data[cmd]['latency_min']['current'] = latency if latency < @stats.latency_data[cmd]['latency_min']['current']
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
if @stats.latency_check_time_count && Time.now.to_i - @stats.latency_data[cmd]['time'] > @stats.latency_check_time_count
average = @stats.latency_data[cmd]['latency'].inject(0.0) { |r, i| r += i } / @stats.latency_data[cmd]['latency'].size
max = @stats.latency_data[cmd]['latency_max']['current']
min = @stats.latency_data[cmd]['latency_min']['current']
@log.debug("Latency average[#{cmd}]: #{sprintf('%.8f', average)}"\
"(denominator=#{@stats.latency_data[cmd]['latency'].length}"\
" max=#{sprintf('%.8f', max)}"\
" min=#{sprintf('%.8f', min)})"
)
@stats.latency_data[cmd]['time'] = Time.now.to_i
@stats.latency_data[cmd]['latency_past'] = @stats.latency_data[cmd]['latency']
@stats.latency_data[cmd]['latency'] = []
@stats.latency_data[cmd]['latency_max']['past'] = @stats.latency_data[cmd]['latency_max']['current']
@stats.latency_data[cmd]['latency_max']['current'] = 0
@stats.latency_data[cmd]['latency_min']['past'] = @stats.latency_data[cmd]['latency_min']['current']
@stats.latency_data[cmd]['latency_min']['current'] = 99_999
end
end
true
end
def asyncev_start_storage_flush_process(args)
hname, dn = args
@log.debug("#{__method__} #{args.inspect}")
st = @storages[hname]
if st.dbs[dn] != :safecopy_flushing
@log.error("Can not flush storage. stat = #{st.dbs[dn]}")
return true
end
t = Thread.new do
begin
st.flush_db(dn)
st.set_db_stat(dn, :safecopy_flushed)
@log.info("#{__method__}:storage has flushed. (#{hname}, #{dn})")
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
end
end
t[:name] = __method__
true
end
def asyncev_start_storage_cachecleaning_process(args)
hname, dn = args
@log.debug("#{__method__} #{args.inspect}")
st = @storages[hname]
if st.dbs[dn] != :cachecleaning
@log.error("Can not start cachecleaning process. stat = #{st.dbs[dn]}")
return true
end
t = Thread.new do
begin
storage_cachecleaning_process(hname, dn)
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
end
end
t[:name] = __method__
true
end
def storage_cachecleaning_process(hname, dn)
count = 0
rcount = 0
st = @storages[hname]
@do_storage_cachecleaning_process = true
loop do
# get keys in a cache up to 100 kyes
keys = st.get_keys_in_cache(dn)
break if keys.nil? || keys.length == 0
break unless @do_storage_cachecleaning_process
# @log.debug("#{__method__}:#{keys.length} keys found")
# copy cache -> db
st.each_cache_by_keys(dn, keys) do |vn, last, clk, expt, k, v|
break unless @do_storage_cachecleaning_process
if st.load_stream_dump_for_cachecleaning(vn, last, clk, expt, k, v)
count += 1
# @log.debug("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was stored.")
else
rcount += 1
# @log.debug("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was rejected.")
end
end
# remove keys in a cache
keys.each { |key| st.out_cache(dn, key) }
end
if @do_storage_cachecleaning_process == false
@log.warn("#{__method__}:uncompleted")
else
st.set_db_stat(dn, :normal)
end
@log.debug("#{__method__}:#{count} keys loaded.")
@log.debug("#{__method__}:#{rcount} keys rejected.") if rcount > 0
ensure
@do_storage_cachecleaning_process = false
end
def asyncev_start_get_routing_event(args)
@log.debug("#{__method__} #{args}")
t = Thread.new do
begin
get_routing_event
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
end
end
t[:name] = __method__
end
def get_routing_event
@log.info("#{__method__}:start.")
routing_path = Config::RTTABLE_PATH
f_list = Dir.glob("#{routing_path}/#{@stats.ap_str}*")
f_list.each do|fname|
IO.foreach(fname)do|line|
if line =~ /join|leave/
@rttable.event.shift if @rttable.event.size >= @rttable.event_limit_line
@rttable.event << line.chomp
end
end
end
@log.info("#{__method__} has done.")
rescue => e
@log.error("#{e}\n#{$ERROR_POSITION}")
end
def asyncev_start_get_logs(args)
@log.debug("#{__method__} #{args}")
t = Thread.new do
begin
get_logs(args)
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
@stats.gui_run_gather_logs = false
end
end
t[:name] = __method__
end
def get_logs(args)
@log.debug("#{__method__}:start.")
log_path = Config::LOG_PATH
log_file = "#{log_path}/#{@stats.ap_str}.log"
target_logs = []
File.open(log_file)do|f|
start_point = get_point(f, args[0], 'start')
end_point = get_point(f, args[1], 'end')
## read target logs
f.seek(start_point, IO::SEEK_SET)
target_logs = f.read(end_point - start_point)
target_logs = target_logs.each_line.map(&:chomp)
target_logs.delete('.')
end
@rttable.logs = target_logs
# set gathered date for expiration
@rttable.logs.unshift(Time.now)
@log.debug("#{__method__} has done.")
rescue => e
@rttable.logs = []
@log.error("#{e}\n#{$ERROR_POSITION}")
ensure
@stats.gui_run_gather_logs = false
end
def get_point(f, target_time, type, latency_time = Time.now, current_pos = 0, new_pos = f.size / 2)
# hilatency check
ps = Time.now - latency_time
if ps > 5
@log.warn('gather_logs process was failed.')
fail
end
# initialize read size
read_size = 2048
# first check
unless target_time.class == Time
# in case of not set end_date
return f.size if target_time == 'current'
target_time =~ (/(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)/)
target_time = Time.mktime(Regexp.last_match[1], Regexp.last_match[2], Regexp.last_match[3], Regexp.last_match[4], Regexp.last_match[5], Regexp.last_match[6], 000000)
# check outrange or not
f.seek(0, IO::SEEK_SET)
begining_log = f.read(read_size)
pos = begining_log.index(/[IDEW],\s\[(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)\.(\d+)/)
begining_time = Time.mktime(Regexp.last_match[1], Regexp.last_match[2], Regexp.last_match[3], Regexp.last_match[4], Regexp.last_match[5], Regexp.last_match[6], Regexp.last_match[7])
f.seek(-read_size, IO::SEEK_END)
end_log = f.read(read_size)
pos = end_log.rindex(/[IDEW],\s\[(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)\.(\d+)/)
end_time = Time.mktime(Regexp.last_match[1], Regexp.last_match[2], Regexp.last_match[3], Regexp.last_match[4], Regexp.last_match[5], Regexp.last_match[6], Regexp.last_match[7])
case type
when 'start'
if target_time < begining_time
return 0
elsif target_time > end_time
@log.error('irregular time was set.')
fail
end
when 'end'
if target_time > end_time
return f.size
elsif target_time < begining_time
@log.error('irregular time was set.')
fail
end
end
end
# read half sector size
f.seek(new_pos, IO::SEEK_SET)
sector_log = f.read(read_size)
# grep date
date_a = sector_log.scan(/[IDEW],\s\[(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)\.(\d+)/)
time_a = []
date_a.each do|time|
time_a.push(Time.mktime(time[0], time[1], time[2], time[3], time[4], time[5], time[6]))
end
sector_time_first = time_a[0]
sector_time_last = time_a[-1]
if target_time.between?(sector_time_first, sector_time_last)
time_a.each do|time|
if target_time <= time
time_string = time.strftime('%Y-%m-%dT%H:%M:%S')
target_index = sector_log.index(/[IDEW],\s\[#{time_string}/)
return new_pos + target_index
end
end
elsif sector_time_first > target_time
target_pos = new_pos - ((new_pos - current_pos).abs / 2)
elsif sector_time_first < target_time
target_pos = new_pos + ((new_pos - current_pos).abs / 2)
end
get_point(f, target_time, type, latency_time, new_pos, target_pos)
end
def asyncev_start_replicate_existing_data_process(args)
# args is [$roma.cr_writer.replica_rttable])
t = Thread.new do
begin
$roma.cr_writer.run_existing_data_replication = true
replicate_existing_data_process(args)
rescue => e
@log.error("#{__method__}:#{e.inspect} #{$ERROR_POSITION}")
ensure
$roma.cr_writer.run_existing_data_replication = false
end
end
t[:name] = __method__
end
def replicate_existing_data_process(args)
@log.info("#{__method__} :start.")
@storages.each_key do |hname|
@rttable.v_idx.each_key do |vn|
raise unless $roma.cr_writer.run_existing_data_replication
args[0].v_idx[vn].each do |replica_nid|
res = push_a_vnode_stream(hname, vn, replica_nid)
if res != 'STORED'
@log.error("#{__method__}:push_a_vnode was failed:hname=#{hname} vn=#{vn}:#{res}")
return false
end
end
end
end
@log.info("#{__method__} has done.")
rescue => e
@log.error("#{e}\n#{$ERROR_POSITION}")
end
end # module AsyncProcess
end # module Roma