lib/roma/routing/rttable.rb

Summary

Maintainability
A
45 mins
Test Coverage
require 'roma/logging/rlogger'
require 'roma/routing/routing_data'
require 'roma/routing/merkle_tree'
require 'yaml'
require 'json'

module Roma
  module Routing

    class RoutingTable

      attr :rd
      attr :search_mask
      attr :fail_cnt
      attr :mtree
      attr_reader :hbits
      attr_reader :rn
      attr_reader :div_bits
      attr_accessor :fail_cnt_threshold
      attr_accessor :fail_cnt_gap
      attr_accessor :sub_nid

      def initialize(rd)
        @log = Roma::Logging::RLogger.instance
        @rd = rd
        @rn = @rd.rn
        @div_bits=@rd.div_bits
        @hbits = 2**@rd.dgst_bits
        @search_mask = @rd.search_mask
        @fail_cnt = Hash.new(0)
        @fail_cnt_threshold = 5
        @fail_cnt_gap = 0
        @fail_time = Time.now
        @sub_nid = {}
        init_mtree
      end

      def num_of_vn(ap)
        pn = short = lost = 0
        sn = Array.new(@rd.rn - 1, 0)
        @rd.v_idx.each_pair do |vn, nids|
          if nids == nil || nids.length == 0
            lost += 1
            next
          elsif nids[0] == ap
            pn += 1
          elsif nids.include?(ap)
            i = nids.index(ap) - 1
            sn[i] += 1
          end
          short += 1 if nids.length < @rd.rn
        end
        [pn, sn, short, lost]
      end

      def get_stat(ap)
        pn, sn, short, lost = num_of_vn(ap)
        ret = {}
        ret['routing.redundant'] = @rn
        ret['routing.nodes.length'] = nodes.length
        ret['routing.nodes'] = nodes.inspect
        ret['routing.dgst_bits'] = @rd.dgst_bits
        ret['routing.div_bits'] = @div_bits
        ret['routing.vnodes.length'] = vnodes.length
        ret['routing.primary'] = pn
        (@rn-1).times{|i| ret["routing.secondary#{i+1}"] = sn[i]}
        ret['routing.short_vnodes'] = short
        ret['routing.lost_vnodes'] = lost
        ret['routing.fail_cnt_threshold'] = @fail_cnt_threshold
        ret['routing.fail_cnt_gap'] = @fail_cnt_gap
        ret['routing.sub_nid'] = @sub_nid.inspect
        ret
      end

      def init_mtree
        @mtree = MerkleTree.new(@rd.dgst_bits,@rd.div_bits)
        @rd.v_idx.each_pair{ |vn, nids|
          @mtree.set(vn,nids)
        }
      end

      def nodes
        @rd.nodes.clone
      end

      def vnodes
        @rd.v_idx.keys
      end

      # get vnode id from hash value
      def get_vnode_id(d)
        d & @search_mask
      end

      # get array of node ID which have vnode
      # +vn+: vnode id
      def search_nodes(vn)
        @rd.v_idx[vn].clone
      rescue
        nil
      end

      # delete dropping node from list
      # +nid+: dropping node
      def leave(nid)
        @rd.nodes.delete(nid)
        # delet nid from list
        @rd.v_idx.each_pair{ |vn, nids|
          nids.delete_if{ |nid2| nid2 == nid}
          if nids.length == 0
            @log.error("Vnode data is lost.(Vnode=#{vn})")
          end
          @mtree.set(vn,nids)
        }
        @fail_cnt.delete(nid)
      end

      def dump
        Marshal.dump(@rd)
      end

      def dump_yaml
        YAML.dump(@rd)
      end

      def dump_json
        JSON.generate(
                      [{:dgst_bits=>@rd.dgst_bits,:div_bits=>@rd.div_bits,:rn=>@rd.rn},
                       @rd.nodes,@rd.v_idx])
      end

      def dump_binary
        @rd.dump_binary
      end

      def check_repetition_in_routing
        @rd.v_idx.each_value{|value|
          host = []
          value.each{|instance|
            host << instance.split("_")[0]
          }
          if host.uniq!
            return true
          end
        }

        false
      end

      def proc_failed(nid)
        t = Time.now
        if t - @fail_time > @fail_cnt_gap
          @fail_cnt[nid] += 1
          if @fail_cnt[nid] >= @fail_cnt_threshold
            leave(nid)
          end
        end
        @fail_time = t
      end

      def proc_succeed(nid)
        @fail_cnt.delete(nid)
      end

      # Reconstruct vnodes from v_idx
      def create_nodes_from_v_idx
        @rd.create_nodes_from_v_idx
      end

      # Returns a new RoutingData object which replaced host name by the sub_nid attribute.
      def sub_nid_rd(addr)
        sub_nid.each do |mask, sub|
          if check_netmask?(addr, mask)
            return get_replaced_rd(sub[:regexp], sub[:replace])
          end
        end
        nil
      end

      private

      def get_replaced_rd(regxp, replace)
        rd = Marshal.load(dump)

        rd.nodes.map! do |nid|
          nid.sub(regxp, replace)
        end

        rd.v_idx.each_value do |nids|
          nids.map! do |nid|
            nid.sub(regxp, replace)
          end
        end
        rd
      end

      def check_netmask?(addr, mask)
        if addr =~ /(\d+)\.(\d+)\.(\d+)\.(\d+)/
          iaddr = ($1.to_i  << 24) + ($2.to_i << 16) + ($3.to_i << 8) + $4.to_i
        else
          @log.error("#{__method__}:Illigal format addr #{addr}")
          return false
        end

        if mask =~ /(\d+)\.(\d+)\.(\d+)\.(\d+)\/(\d+)/
          imask_addr = ($1.to_i  << 24) + ($2.to_i << 16) + ($3.to_i << 8) + $4.to_i
          imask = (2 ** $5.to_i - 1) << (32 - $5.to_i)
        else
          @log.error("#{__method__}:Illigal format mask #{mask}")
          return false
        end
        (iaddr & imask) == (imask_addr & imask)
      end

    end # class RoutingTable

  end # module Routing
end # module Roma