lib/roma/write_behind.rb

Summary

Maintainability
B
5 hrs
Test Coverage
require 'thread'
require 'roma/stats'
require 'socket'
require 'roma/messaging/con_pool'
require 'timeout'

module Roma

  module WriteBehind

    class FileWriter

      attr_accessor :shift_size

      def initialize(path, shift_size, log)
        @stats = Roma::Stats.instance
        path.chop! if path[-1]=='/'
        @path = path
        @log = log
        @fdh = {} # file handle hash
        @fnh = {} # file name hash
        @do_write = false
        @shift_size = shift_size
        @total_size = Hash.new(0)
        @rottime = Time.now
      end

      def get_stat
        ret = {}
        ret['write-behind.path'] = File.expand_path(@path)
        ret['write-behind.shift_size'] = @shift_size
        ret['write-behind.do_write'] = @do_write
        @fdh.each{|hname,fname|
          ret["write-behind[#{hname}].path"] = File.expand_path(fname)
          ret["write-behind[#{hname}].size"] = @total_size[hname]
        }
        ret
      end

      def write(hname, cmd, key, val)
        @do_write = true
        t = Time.now
        if @total_size[hname] >= @shift_size || t >= @rottime
          @do_write = false
          rotate(hname)
        end

        fd = @fdh[hname]
        unless fd
          fd = openfile(hname)
          @log.info("WriteBehind file has been created: [#{@fnh[hname]}]")
          @total_size[hname] = 0
        end
        klen = key.length
        val = val.to_s
        vlen = val.length
        size = fd.write([t.to_i, cmd, klen, key, vlen, val].pack("NnNa#{klen}Na#{vlen}"))
        @total_size[hname] += size
#        @log.debug("WriteBehind:hname=#{hname} cmd=#{cmd} key=#{key} val=#{val} total_size=#{@total_size}")
      ensure
        @do_write = false
      end

      def rotate(hname)
        @log.info("WriteBehind:rotate #{hname}")
        fd_old = @fdh[hname]
        unless fd_old
          @log.info("WriteBehind:rotate #{hname} not opend")
          return false
        end
        @fdh.delete(hname)
        @fnh.delete(hname)
        sleep 0.01 while @do_write
        fd_old.close
        @log.info("WriteBehind:rotate succeed")
        true
      end

      def openfile(hname)
        t = Time.now
        path = "#{@path}/#{@stats.ap_str}/#{hname}/#{t.strftime('%Y%m%d')}"
        mkdir(path)
        # set a next rotation time
        @rottime = Time.local(t.year,t.month,t.day,0,0,0) + 24 * 60 * 60

        max = -1
        Dir::glob("#{path}/*.wb").each{|f|
          if /\D(\d+).wb$/ =~ f
            max = $1.to_i if $1.to_i > max
          end
        }
        fname = "#{path}/#{max + 1}.wb"
        fd = open(fname,'wb')
        @fnh[hname] = fname
        @fdh[hname] = fd
      end

      def wb_get_path(hname)
        File.expand_path("#{@path}/#{@stats.ap_str}/#{hname}")
      end

      def get_current_file_path(hname)
        @log.info("WriteBehind:get_current_file_path #{hname}")
        unless @fnh[hname]
          @log.info("WriteBehind:get_current_file_path #{hname} not opend")
          return nil
        end
        File.expand_path("#{@fnh[hname]}")
      end

      def close_all
        @fdh.each_value{|fd| fd.close }
      end

      def mkdir(path)
        pbuf = ''
        path.split('/').each{|p|
          pbuf << p
          begin
            Dir::mkdir(pbuf) unless File.exist?(pbuf)
          rescue
          end
          pbuf << '/'
        }
      end

    end # class FileWriter

    class StreamWriter

      attr_accessor :run_replication
      attr_accessor :run_existing_data_replication
      attr_accessor :replica_mklhash
      attr_accessor :replica_nodelist
      attr_accessor :replica_rttable

      def initialize(log)
        @log = log
        @run_replication = false
        @run_existing_data_replication = false
        @replica_mklhash = nil
        @replica_nodelist = []
        @replica_rttable = nil
        @do_transmit = false
      end

      def get_stat
        ret = {}
        ret['write-behind.run_replication'] = @run_replication
        ret['write-behind.run_existing_data_replication'] = @run_existing_data_replication
        ret['write-behind.replica_mklhash'] = @replica_mklhash
        ret['write-behind.replica_nodelist'] = @replica_nodelist
        ret
      end

      def change_mklhash?
        con = Roma::Messaging::ConPool.instance.get_connection(@replica_nodelist[0])
        con.write("mklhash 0\r\n")
        current_mklhash = con.gets.chomp
        Roma::Messaging::ConPool.instance.return_connection(@replica_nodelist[0], con)

        if current_mklhash == @replica_mklhash
          return false
        else
          return true
        end
      rescue
        @replica_nodelist.shift
        if @replica_nodelist.length == 0
          @run_replication = false
          @log.error("Replicate Cluster was down.")
        else
          retry
        end
      end

      def update_mklhash(nid)
        Timeout.timeout(1) do
          con = Roma::Messaging::ConPool.instance.get_connection(nid)
          con.write("mklhash 0\r\n")
          @replica_mklhash = con.gets.chomp
          Roma::Messaging::ConPool.instance.return_connection(nid, con)
          @log.debug("replica_mklhash has updated: [#{@replica_mklhash}]")
        end
      rescue => e
        @log.error("#{e}\n#{$@}")
      end

      def update_nodelist(nid)
        Timeout.timeout(1) do
          con = Roma::Messaging::ConPool.instance.get_connection(nid)
          con.write("nodelist\r\n")
          @replica_nodelist = con.gets.chomp.split("\s")
          Roma::Messaging::ConPool.instance.return_connection(nid, con)
          @log.debug("replica_nodelist has updated: #{@replica_nodelist}")
        end
      rescue => e
        @log.error("#{e}\n#{$@}")
      end

      def update_rttable(nid)
        Timeout.timeout(1) do
          con = Roma::Messaging::ConPool.instance.get_connection(nid)
          con.write "routingdump\r\n"
          routes_length = con.gets.to_i
          if (routes_length <= 0)
            con.close
            @log.error("#{__method__} process was failed.\r\n") if routes_length < 0
            return nil
          end

          routes = ''
          while (routes.length != routes_length)
            routes = routes + con.read(routes_length - routes.length)
          end
          con.read(2) # "\r\n"
          con.gets
          rd = Marshal.load(routes)
          Roma::Messaging::ConPool.instance.return_connection(nid, con)
          @replica_rttable = rd
          @log.debug("replica_rttable has updated: [#{@replica_rttable}]")
        end
      rescue => e
        @log.error("#{e}\n#{$@}")
        nil
      end

      def search_replica_primary_node(key)
        d = Digest::SHA1.hexdigest(key).hex % (2**@replica_rttable.dgst_bits)
        nodes = @replica_rttable.v_idx[d & @replica_rttable.search_mask]
        return nodes[0] # for send primary node of replica cluster
      rescue => e
        @log.error("#{e}\n#{$@}")
        nil
      end

      def transmit(cmd, key, value) # value is for error log
        Timeout.timeout(5) do
          @do_transmit = true
          nid = search_replica_primary_node(key)
          con = Roma::Messaging::ConPool.instance.get_connection(nid)
          con.write(cmd)
          con.gets # for return connection
          Roma::Messaging::ConPool.instance.return_connection(nid, con)
        end
      rescue => e
        @log.error("#{e}\n#{$@}")
        @log.error("replication error: key=#{key} value=#{value}\r\n")
      ensure
        @do_transmit = false
      end

      def close_all
        @replica_nodelist.each{|nid|
          Roma::Messaging::ConPool.instance.close_at(nid)
        }
      end

    end # class StreamWriter

  end # module WriteBehind

  module WriteBehindProcess

    @@wb_queue = Queue.new

    def self.push(hname, cmd, key, val)
      @@wb_queue.push([hname, cmd, key, val])
    end

    def start_wb_process
      @wb_thread = Thread.new{
        wb_process_loop
      }
      @wb_thread[:name] = 'write_behind'
    rescue =>e
      @log.error("#{e}\n#{$@}")
    end

    def stop_wb_process
      until @@wb_queue.empty?
        sleep 0.01
      end
      @wb_thread.exit
      @wb_writer.close_all
      @cr_writer.close_all
    end

    def wb_rotate(hname)
      @wb_writer.rotate(hname)
    end

    def wb_get_path(hname)
      @wb_writer.wb_get_path(hname)
    end

    def wb_get_current_file_path(hname)
      @wb_writer.get_current_file_path(hname)
    end

    def wb_get_stat
      @wb_writer.get_stat.merge(@cr_writer.get_stat)
    end

    def wb_process_loop
      loop {
        while dat = @@wb_queue.pop
          # dat ====> [hname, cmd, key, value]
          @cr_writer.transmit(dat[1], dat[2], dat[3]) unless dat[0]
          @wb_writer.write(dat[0], dat[1], dat[2], dat[3])
        end
      }
    rescue =>e
      @log.error("#{e}\n#{$@}")
      retry
    end
    private :wb_process_loop

  end # module WriteBehindProcess

end # module Roma