lib/roma/command/sys_command_receiver.rb

Summary

Maintainability
F
1 wk
Test Coverage
require 'roma/async_process'

module Roma
  module Command

    module SystemCommandReceiver

      # balse [reason]
      def ev_balse(s)
        send_data("Are you sure?(yes/no)\r\n")
        if gets != "yes\r\n"
          close_connection_after_writing
          return
        end

        if s.length == 2
          @log.info("Receive a balse #{s[1]}")
        else
          @log.info("Receive a balse command.")
        end
        @rttable.enabled_failover = false
        res = broadcast_cmd("rbalse\r\n")
        res[@stats.ap_str] = "BYE"
        send_data("#{res.inspect}\r\n")
        close_connection_after_writing
        @stop_event_loop = true
      end

      # rbalse [reason]
      def ev_rbalse(s)
        if s.length == 2
          @log.info("Receive a rbalse #{s[1]}")
        else
          @log.info("Receive a rbalse command.")
        end
        @rttable.enabled_failover = false
        send_data("BYE\r\n")
        close_connection_after_writing
        @stop_event_loop = true
      end

      # shutdown [reason]
      def ev_shutdown(s)
        send_data("*** ARE YOU REALLY SURE TO SHUTDOWN? *** (yes/no)\r\n")
        if gets != "yes\r\n"
          close_connection_after_writing
          return
        end

        if s.length == 2
          @log.info("Receive a shutdown #{s[1]}")
        else
          @log.info("Receive a shutdown command.")
        end
        @rttable.enabled_failover = false
        res = broadcast_cmd("rshutdown\r\n")
        res[@stats.ap_str] = "BYE"
        send_data("#{res.inspect}\r\n")
        close_connection_after_writing
        @stop_event_loop = true
      end

      # rshutdown [reason]
      def ev_rshutdown(s)
        if s.length == 2
          @log.info("Receive a rshutdown #{s[1]}")
        else
          @log.info("Receive a rshutdown command.")
        end
        @rttable.enabled_failover = false
        send_data("BYE\r\n")
        close_connection_after_writing
        @stop_event_loop = true
      end

      # shutdown_self
      def ev_shutdown_self(s)
        if s.length != 1
          send_data("ERROR: shutdown_instance has irregular argument.\r\n")
        else
          send_data("\r\n=================================================================\r\n")
          send_data("CAUTION!!: \r\n\tThis command kill the instance!\r\n\tThere is some possibility of occuring redundancy down!\r\n")
          send_data("=================================================================\r\n")
          send_data("\r\nAre you sure to shutdown this instance?(yes/no)\r\n")
          if gets != "yes\r\n"
            close_connection_after_writing
            return
          end
          @log.info("Receive a shutdown_self command.")
          @rttable.enabled_failover = false
          send_data("BYE\r\n")
          @stop_event_loop = true
          close_connection_after_writing
        end
      end

      # version
      def ev_version(s)
        send_data("VERSION ROMA-#{Roma::VERSION}\r\n")
      end

      # quit
      def ev_quit(s)
        close_connection
      end

      def ev_whoami(s)
        send_data("#{@stats.name}\r\n")
      end

      # stats [regexp]
      def ev_stats(s); ev_stat(s); end

      # stat [regexp]
      def ev_stat(s)
        regexp = s[1] if s.length == 2
        h = {}
        h['version'] = Roma::VERSION
        send_stat_result(nil,h,regexp)
        send_stat_result(nil,get_config_stat,regexp)
        send_stat_result(nil,@stats.get_stat,regexp)
        @storages.each{|hname,st|
          send_stat_result("storages[#{hname}].",st.get_stat,regexp)
        }
        send_stat_result(nil,$roma.wb_get_stat,regexp)
        send_stat_result(nil,@rttable.get_stat(@stats.ap_str),regexp)
        send_stat_result(nil,conn_get_stat,regexp)
        send_stat_result(nil,DNSCache.instance.get_stat,regexp)
        send_data("END\r\n")
      end

      def send_stat_result(prefix,h,regexp = nil)
        h.each{|k,v|
          if prefix
            key = "#{prefix}#{k}"
          else
            key = "#{k}"
          end
          if regexp
            send_data("#{key} #{v}\r\n") if key =~ /#{regexp}/
          else
            send_data("#{key} #{v}\r\n")
          end
        }
      end
      private :send_stat_result

      # writebehind_rotate [hash_name]
      def ev_writebehind_rotate(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 1)\r\n")
        end
        res = broadcast_cmd("rwritebehind_rotate #{s[1]}\r\n")

        if $roma.wb_rotate(s[1])
          res[@stats.ap_str] = "ROTATED"
        else
          res[@stats.ap_str] = "NOT_OPEND"
        end
        send_data("#{res}\r\n")
      end

      # rwritebehind_rotate [hash_name]
      def ev_rwritebehind_rotate(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 1)\r\n")
        end
        if $roma.wb_rotate(s[1])
          send_data("ROTATED\r\n")
        else
          send_data("NOT_OPEND\r\n")
        end
      end

      # writebehind_get_path [hash_name]
      def ev_writebehind_get_path(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 1)\r\n")
        end
        res = broadcast_cmd("rwritebehind_get_path #{s[1]}\r\n")

        ret = $roma.wb_get_path(s[1])
        res[@stats.ap_str] = ret

        send_data("#{res}\r\n")
      end

      # rwritebehind_get_path [hash_name]
      def ev_rwritebehind_get_path(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 1)\r\n")
        end

        ret = $roma.wb_get_path(s[1])
        send_data("#{ret}\r\n")
      end

      # writebehind_get_current_file [hash_name]
      def ev_writebehind_get_current_file(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 1)\r\n")
        end
        res = broadcast_cmd("rwritebehind_get_current_file #{s[1]}\r\n")

        ret = $roma.wb_get_current_file_path(s[1])
        if ret
          res[@stats.ap_str] = ret
        else
          res[@stats.ap_str] = "NOT_OPEND"
        end
        send_data("#{res}\r\n")
      end

      # rwritebehind_get_current_file [hash_name]
      def ev_rwritebehind_get_current_file(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 1)\r\n")
        end
        ret = $roma.wb_get_current_file_path(s[1])
        if ret
          send_data("#{ret}\r\n")
        else
          send_data("NOT_OPEND\r\n")
        end
      end

      # switch_replication command is change status of cluster replication
      # if you want to activate, assign 1 nid(addr_port) of replication cluster as argument.
      # if you want to copy existing data, add the 'all' after nid as argument
      # switch_replication <true|false> [nid] [copy target]
      def ev_switch_replication(s)
        unless s.length.between?(2, 4)
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        unless s[1] =~ /^(true|false)$/
          return send_data("CLIENT_ERROR value must be true or false\r\n")
        end
        if s[3] && s[3] != 'all'
          return send_data("CLIENT_ERROR [copy target] must be all or nil\r\n")
        end

        res = broadcast_cmd("rswitch_replication #{s[1]} #{s[2]} #{s[3]}\r\n")

        Timeout.timeout(1){
          case s[1]
          when 'true'
            $roma.cr_writer.update_mklhash(s[2])
            $roma.cr_writer.update_nodelist(s[2])
            $roma.cr_writer.update_rttable(s[2])
            $roma.cr_writer.run_replication = true
            if s[3] == 'all'
              $roma.cr_writer.run_existing_data_replication = true
              Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('start_replicate_existing_data_process', [$roma.cr_writer.replica_rttable]))
            end
            res[@stats.ap_str] = "ACTIVATED"
          when 'false'
            $roma.cr_writer.replica_mklhash = nil
            $roma.cr_writer.replica_nodelist = []
            $roma.cr_writer.replica_rttable = nil
            $roma.cr_writer.run_replication = false
            $roma.cr_writer.run_existing_data_replication = false
            res[@stats.ap_str] = "DEACTIVATED"
          end
        }
        send_data("#{res}\r\n")
      rescue => e
        send_data("#{e.class}: #{e}\r\n")
      end

      # rswitch_replication <true|false> [nid] [copy target]
      def ev_rswitch_replication(s)
        unless s.length.between?(2, 4)
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        unless s[1] =~ /^(true|false)$/
          return send_data("CLIENT_ERROR value must be true or false\n\r")
        end
        if s[3] && s[3] != 'all'
          return send_data("CLIENT_ERROR [copy target] must be all or nil\r\n")
        end

        Timeout.timeout(1){
          case s[1]
          when 'true'
            $roma.cr_writer.update_mklhash(s[2])
            $roma.cr_writer.update_nodelist(s[2])
            $roma.cr_writer.update_rttable(s[2])
            $roma.cr_writer.run_replication = true
            if s[3] == 'all'
              $roma.cr_writer.run_existing_data_replication = true
              Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('start_replicate_existing_data_process', [$roma.cr_writer.replica_rttable]))
            end
            send_data("ACTIVATED\r\n")
          when 'false'
            $roma.cr_writer.replica_mklhash = nil
            $roma.cr_writer.replica_nodelist = []
            $roma.cr_writer.replica_rttable = nil
            $roma.cr_writer.run_replication = false
            $roma.cr_writer.run_existing_data_replication = false
            send_data("DEACTIVATED\r\n")
          end
        }
      rescue => e
        send_data("#{e.class}: #{e}\r\n")
      end

      # dcnice command is setting priority for a data-copy thread.
      # a niceness of 1 is the highest priority and 5 is the lowest priority.
      # dcnice <priority:1 to 5>
      def ev_dcnice(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 1)\r\n")
        end
        res = broadcast_cmd("rdcnice #{s[1]}\r\n")
        res[@stats.ap_str] = dcnice(s[1].to_i)
        send_data("#{res}\r\n")
      end

      def ev_rdcnice(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 1)\r\n")
        end

        send_data("#{dcnice(s[1].to_i)}\r\n")
      end

      def ev_restart(s)
        res = broadcast_cmd("rrestart\r\n")
        $roma.eventloop = true
        @rttable.enabled_failover = false
        Messaging::ConPool.instance.close_all
        Event::EMConPool::instance.close_all
        EventMachine::stop_event_loop
        res[@stats.ap_str] = "RESTARTED"
        send_data("#{res}\r\n")
      end

      def ev_rrestart(s)
        $roma.eventloop = true
        @rttable.enabled_failover = false
        Messaging::ConPool.instance.close_all
        Event::EMConPool::instance.close_all
        EventMachine::stop_event_loop
        send_data("RESTARTED\r\n")
      end

      # set_log_level [ 'debug' | 'info' | 'warn' | 'error' ]
      def ev_set_log_level(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 1)\r\n")
        end

        case s[1].downcase
        when 'debug'
          @log.level = Roma::Logging::RLogger::Severity::DEBUG
        when 'info'
          @log.level = Roma::Logging::RLogger::Severity::INFO
        when 'warn'
          @log.level = Roma::Logging::RLogger::Severity::WARN
        when 'error'
          @log.level = Roma::Logging::RLogger::Severity::ERROR
        else
          return send_data("CLIENT_ERROR no match log-level string\r\n")
        end

        @stats.log_level = s[1].downcase

        send_data("STORED\r\n")
      end

      # out <key> <vn>
      def ev_out(s)
        key,hname = s[1].split("\e")
        hname ||= @defhash
        if s.length >= 3
          vn = s[2].to_i
        else
          d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits
          vn = @rttable.get_vnode_id(d)
        end
        res = @storages[hname].out(vn, key, 0)
        @stats.out_message_count += 1
        unless res
          return send_data("NOT_DELETED\r\n")
        end
        send_data("DELETED\r\n")
      end

      # rset <key> <hash value> <timelimit> <length>
      # "set" means "store this data".
      # <command name> <key> <digest> <exptime> <bytes> [noreply]\r\n
      # <data block>\r\n
      def ev_rset(s)
        key,hname = s[1].split("\e")
        hname ||= @defhash
        d = s[2].to_i
        d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits if d == 0
        data = read_bytes(s[5].to_i)
        read_bytes(2)
        vn = @rttable.get_vnode_id(d)
        unless @storages.key?(hname)
          send_data("SERVER_ERROR #{hname} does not exists.\r\n")
          return
        end
        if @storages[hname].rset(vn, key, d, s[3].to_i, s[4].to_i, data)
          send_data("STORED\r\n")
        else
          @log.error("rset NOT_STORED:#{@storages[hname].error_message} #{vn} #{s[1]} #{d} #{s[3]} #{s[4]}")
          send_data("NOT_STORED\r\n")
        end
        @stats.redundant_count += 1
      end

      # <command name> <key> <digest> <exptime> <bytes> [noreply]\r\n
      # <compressed data block>\r\n
      def ev_rzset(s)
        key,hname = s[1].split("\e")
        hname ||= @defhash
        d = s[2].to_i
        d = Digest::SHA1.hexdigest(key).hex % @rttable.hbits if d == 0
        zdata = read_bytes(s[5].to_i)
        read_bytes(2)
        vn = @rttable.get_vnode_id(d)
        unless @storages.key?(hname)
          send_data("SERVER_ERROR #{hname} does not exists.\r\n")
          return
        end

        data = Zlib::Inflate.inflate(zdata)
# @log.debug("data = #{data}")
        if @storages[hname].rset(vn, key, d, s[3].to_i, s[4].to_i, data)
          send_data("STORED\r\n")
        else
          @log.error("rzset NOT_STORED:#{@storages[hname].error_message} #{vn} #{s[1]} #{d} #{s[3]} #{s[4]}")
          send_data("NOT_STORED\r\n")
        end
        @stats.redundant_count += 1
      rescue Zlib::DataError => e
        @log.error("rzset NOT_STORED:#{e} #{vn} #{s[1]} #{d} #{s[3]} #{s[4]}")
        send_data("NOT_STORED\r\n")
      end

      def ev_forcedly_start(s)
        @log.info("ROMA forcedly start.")
        AsyncProcess::queue.clear
        @rttable.enabled_failover = true
        Command::Receiver::mk_evlist
        $roma.startup = false
        send_data("STARTED\r\n")
      end

      # switch_failover <on|off>
      def ev_switch_failover(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        res = broadcast_cmd("rswitch_failover #{s[1]}\r\n")
        if s[1] == 'on'
          Messaging::ConPool.instance.close_all
          Event::EMConPool::instance.close_all
          @rttable.enabled_failover = true
          @log.info("failover enabled")
          res[@stats.ap_str] = "ENABLED"
        elsif s[1] == 'off'
          @rttable.enabled_failover = false
          @log.info("failover disabled")
          res[@stats.ap_str] = "DISABLED"
        else
          res[@stats.ap_str] = "NOTSWITCHED"
        end
        send_data("#{res}\r\n")
      end

      # rswitch_failover <on|off>
      def ev_rswitch_failover(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        if s[1] == 'on'
          Messaging::ConPool.instance.close_all
          Event::EMConPool::instance.close_all
          @rttable.enabled_failover = true
          @log.info("failover enabled")
          return send_data("ENABLED\r\n")
        elsif s[1] == 'off'
          @rttable.enabled_failover = false
          @log.info("failover disabled")
          return send_data("DISABLED\r\n")
        else
          send_data("NOTSWITCHED\r\n")
        end
      end

      def ev_set_descriptor_table_size(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        elsif s[1].to_i < 1024
          return send_data("CLIENT_ERROR length must be greater than 1024\r\n")
        end

        res = broadcast_cmd("rset_descriptor_table_size #{s[1]}\r\n")

        EM.set_descriptor_table_size(s[1].to_i)
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      def ev_rset_descriptor_table_size(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        elsif s[1].to_i < 1024
          return send_data("CLIENT_ERROR length must be greater than 1024\r\n")
        end

        EM.set_descriptor_table_size(s[1].to_i)
        send_data("STORED\r\n")
      end

      # set_latency_avg_calc_rule <mode> <count> <command1> <command2>....
      # <mode> is on/off
      # <count> is denominator to calculate average.
      # <commandx> is target command
      def ev_set_latency_avg_calc_rule(s)
        #check argument
        if /^on$|^off$/ !~ s[1]
          return send_data("CLIENT_ERROR argument 1: please input \"on\" or \"off\"\r\n")
        elsif s[1] == "on" && (s.length <= 3 || s[2].to_i < 1)
          return send_data("CLIENT_ERROR number of arguments (0 for 3) and <count> must be greater than zero\r\n")
        elsif s[1] == "off" && !(s.length == 2)
          return send_data("CLIENT_ERROR number of arguments (0 for 1, or more 3)\r\n")
        end

        #check support commands
        s.each_index {|idx|
          if idx >= 3 && (!Event::Handler::ev_list.include?(s[idx]) || Event::Handler::system_commands.include?(s[idx]))
             return send_data("NOT SUPPORT [#{s[idx]}] command\r\n")
          end
        }

        arg ="rset_latency_avg_calc_rule"
        s.each_index {|idx|
          arg += " #{s[idx]}" if idx>=1
        }
        res = broadcast_cmd("#{arg}\r\n")

        if s[1] =="on"
          @stats.latency_check_cmd = [] #reset
          s.each_index {|idx|
            @stats.latency_check_cmd.push(s[idx]) if idx >= 3
          }
          @stats.latency_check_time_count = s[2].to_i
          @stats.latency_log = true
          res[@stats.ap_str] = "ACTIVATED"
        elsif s[1] =="off"
          @stats.latency_check_cmd = [] #reset
          @stats.latency_check_time_count = false
          @stats.latency_log = false
          res[@stats.ap_str] = "DEACTIVATED"
        end
        @stats.latency_data = Hash.new { |hash,key| hash[key] = {}}
        send_data("#{res}\r\n")
      end

      def ev_rset_latency_avg_calc_rule(s)
        if /^on$|^off$/ !~ s[1]
          return send_data("CLIENT_ERROR argument 1: please input \"on\" or \"off\"\r\n")
        elsif s[1] == "on" && (s.length <= 3 || s[2].to_i < 1)
          return send_data("CLIENT_ERROR number of arguments (0 for 3) and <count> must be greater than zero\r\n")
        elsif s[1] == "off" && !(s.length == 2)
          return send_data("CLIENT_ERROR number of arguments (0 for 1, or more 3)\r\n")
        end

        s.each_index {|idx|
          if idx >= 3 && (!Event::Handler::ev_list.include?(s[idx]) || Event::Handler::system_commands.include?(s[idx]))
             return send_data("NOT SUPPORT [#{s[idx]}] command\r\n")
          end
        }

        if s[1] =="on"
          @latency_data = Hash.new { |hash,key| hash[key] = {}}
          @stats.latency_check_cmd = []
          s.each_index {|idx|
            @stats.latency_check_cmd.push(s[idx]) if idx >= 3
          }
          @stats.latency_check_time_count = s[2].to_i
          @stats.latency_log = true
          send_data("ACTIVATED\r\n")
        elsif s[1] =="off"
          @latency_data = Hash.new { |hash,key| hash[key] = {}}
          @stats.latency_check_cmd = []
          @stats.latency_check_time_count = false
          @stats.latency_log = false
          send_data("DEACTIVATED\r\n")
        end
      end

      # add_calc_latency_average <command1> <command2>....
      def ev_add_latency_avg_calc_cmd(s)
        #check argument
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 2)\r\n")
        end
        #check support commands
        s.each_index {|idx|
          if idx >= 1 && (!Event::Handler::ev_list.include?(s[idx]) || Event::Handler::system_commands.include?(s[idx]))
             return send_data("NOT SUPPORT [#{s[idx]}] command\r\n")
          end
          if idx >= 1 && @stats.latency_check_cmd.include?(s[idx])
            return send_data("ALREADY SET [#{s[idx]}] command\r\n")
          end
        }

        arg ="radd_latency_avg_calc_cmd"
        s.each_index {|idx|
          arg += " #{s[idx]}" if idx>=1
        }
        res = broadcast_cmd("#{arg}\r\n")

        s.each_index {|idx|
          @stats.latency_check_cmd.push(s[idx]) if idx >= 1
        }
        res[@stats.ap_str] = "SET"
        send_data("#{res}\r\n")
      end

      def ev_radd_latency_avg_calc_cmd(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 2)\r\n")
        end
        s.each_index {|idx|
          if idx >= 2 && (!Event::Handler::ev_list.include?(s[idx]) || Event::Handler::system_commands.include?(s[idx]))
             return send_data("NOT SUPPORT [#{s[idx]}] command\r\n")
          end
          if idx >= 1 && @stats.latency_check_cmd.include?(s[idx])
            return send_data("ALREADY SET [#{s[idx]}] command\r\n")
          end
        }

        s.each_index {|idx|
          @stats.latency_check_cmd.push(s[idx]) if idx >= 1
        }
        send_data("SET\r\n")
      end

      # del_calc_latency_average <command1> <command2>....
      def ev_del_latency_avg_calc_cmd(s)
        #check argument
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 2)\r\n")
        end

        #check support commands
        s.each_index {|idx|
          if idx >= 1 && !@stats.latency_check_cmd.include?(s[idx])
            return send_data("[#{s[idx]}] command is NOT set\r\n")
          end
        }

        arg ="rdel_latency_avg_calc_cmd"
        s.each_index {|idx|
          arg += " #{s[idx]}" if idx>=1
        }
        res = broadcast_cmd("#{arg}\r\n")

        s.each_index {|idx|
          @stats.latency_check_cmd.delete(s[idx]) if idx >= 1
          @stats.latency_data.delete(s[idx]) if idx >= 1
        }
        res[@stats.ap_str] = "DELETED"
        send_data("#{res}\r\n")
      end

      def ev_rdel_latency_avg_calc_cmd(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 2)\r\n")
        end

        # reset
        s.each_index {|idx|
          if idx >= 1 && !@stats.latency_check_cmd.include?(s[idx])
            return send_data("[#{s[idx]}] command is NOT set\r\n")
          end
        }
        s.each_index {|idx|
          @stats.latency_check_cmd.delete(s[idx]) if idx >= 1
          @stats.latency_data.delete(s[idx]) if idx >= 1
        }
        send_data("DELETED\r\n")
      end

      # chg_calc_latency_average_denominator <count>
      def ev_chg_latency_avg_calc_time_count(s)
        #check argument
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments (0 for 2)\r\n")
        elsif s[1] != "nil" && s[1].to_i < 1
          return send_data("s[1].class = #{s[1].class}\r\n")
          return send_data("<count> must be greater than zero or nil[DEACTIVATE]\r\n")
        end

        res = broadcast_cmd("rchg_latency_avg_calc_time_count #{s[1]}\r\n")

        if s[1] != "nil"
          @stats.latency_check_time_count = s[1].to_i
          @stats.latency_log = true
        elsif s[1] == "nil"
          @stats.latency_check_time_count = false
          @stats.latency_log = false
        end
        res[@stats.ap_str] = "CHANGED"
        send_data("#{res}\r\n")
      end

      def ev_rchg_latency_avg_calc_time_count(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments (0 for 2)\r\n")
        elsif s[1] != "nil" && s[1].to_i < 1
          return send_data("<count> must be greater than zero\r\n")
        end

        if s[1] != "nil"
          @stats.latency_check_time_count = s[1].to_i
          @stats.latency_log = true
        elsif s[1] == "nil"
          @stats.latency_check_time_count = false
          @stats.latency_log = false
        end
        @stats.latency_check_time_count = s[1].to_i
        send_data("CHANGED\r\n")
      end

      def ev_set_continuous_limit(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 1)\r\n")
        end

        res = broadcast_cmd("rset_continuous_limit #{s[1]}\r\n")

        if Event::Handler.set_ccl(s[1])
          res[@stats.ap_str] = "STORED"
        else
          res[@stats.ap_str] = "NOT_STORED"
        end
        send_data("#{res}\r\n")
      end

      def ev_rset_continuous_limit(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments (0 for 1)\r\n")
        end
        if Event::Handler.set_ccl(s[1])
          send_data("STORED\r\n")
        else
          send_data("NOT_STORED\r\n")
        end
      end

      # set_connection_pool_maxlength <length>
      # set to max length of the connection pool
      def ev_set_connection_pool_maxlength(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        if s[1].to_i < 1
          return send_data("CLIENT_ERROR length must be greater than zero\r\n")
        end

        res = broadcast_cmd("rset_connection_pool_maxlength #{s[1]}\r\n")
        Messaging::ConPool.instance.maxlength = s[1].to_i
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      # rset_connection_pool_maxlength <length>
      def ev_rset_connection_pool_maxlength(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        if s[1].to_i < 1
          return send_data("CLIENT_ERROR length must be greater than zero\r\n")
        end

        Messaging::ConPool.instance.maxlength = s[1].to_i
        send_data("STORED\r\n")
      end

      # set_connection_pool_maxlength <length>
      # set to max length of the connection pool
      def ev_set_emconnection_pool_maxlength(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        if s[1].to_i < 1
          return send_data("CLIENT_ERROR length must be greater than zero\r\n")
        end

        res = broadcast_cmd("rset_emconnection_pool_maxlength #{s[1]}\r\n")
        Event::EMConPool.instance.maxlength = s[1].to_i
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      # rset_connection_pool_maxlength <length>
      def ev_rset_emconnection_pool_maxlength(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        if s[1].to_i < 1
          return send_data("CLIENT_ERROR length must be greater than zero\r\n")
        end

        Event::EMConPool.instance.maxlength = s[1].to_i
        send_data("STORED\r\n")
      end

      # set_accepted_connection_expire_time <sec>
      # set to expired time(sec) for accepted connections
      def ev_set_accepted_connection_expire_time(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end

        res = broadcast_cmd("rset_accepted_connection_expire_time #{s[1]}\r\n")
        Event::Handler::connection_expire_time = s[1].to_i
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      # rset_accepted_connection_expire_time <sec>
      def ev_rset_accepted_connection_expire_time(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        Event::Handler::connection_expire_time = s[1].to_i
        send_data("STORED\r\n")
      end

      # set_connection_pool_expire_time <sec>
      # set to expired time(sec) for connection_pool expire time
      def ev_set_connection_pool_expire_time(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end

        res = broadcast_cmd("rset_connection_pool_expire_time #{s[1]}\r\n")
        Messaging::ConPool.instance.expire_time = s[1].to_i
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      # rset_connection_pool_expire_time <sec>
      def ev_rset_connection_pool_expire_time(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        Messaging::ConPool.instance.expire_time = s[1].to_i
        send_data("STORED\r\n")
      end

      # set_emconnection_pool_expire_time <sec>
      def ev_set_emconnection_pool_expire_time(s)
        # chcking s incude command and value (NOT check digit)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end

        #if ARGV is 0, expire time become infinity(NOT happen expire)
        if s[1].to_i == 0
          s[1] = "2147483647"
        end
        res = broadcast_cmd("rset_emconnection_pool_expire_time #{s[1]}\r\n")
        Event::EMConPool::instance.expire_time = s[1].to_i
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      # rset_emconnection_pool_expire_time <sec>
      def ev_rset_emconnection_pool_expire_time(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        Event::EMConPool::instance.expire_time = s[1].to_i
        send_data("STORED\r\n")
      end

      # switch_dns_caching <on|off|true|false>
      def ev_switch_dns_caching(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end

        res = broadcast_cmd("rswitch_dns_caching #{s[1]}\r\n")
        if s[1] == 'on' || s[1] == 'true'
          DNSCache.instance.enable_dns_cache
          @log.info("DNS caching enabled")
          res[@stats.ap_str] = "ENABLED"
        elsif s[1] == 'off' || s[1] == 'false'
          DNSCache.instance.disable_dns_cache
          @log.info("DNS caching disabled")
          res[@stats.ap_str] = "DISABLED"
        else
          res[@stats.ap_str] = "NOTSWITCHED"
        end
        send_data("#{res}\r\n")
      end

      # rswitch_dns_caching <on|off|true|false>
      def ev_rswitch_dns_caching(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end

        if s[1] == 'on' || s[1] == 'true'
          DNSCache.instance.enable_dns_cache
          @log.info("DNS caching enabled")
          return send_data("ENABLED\r\n")
        elsif s[1] == 'off' || s[1] == 'false'
          DNSCache.instance.disable_dns_cache
          @log.info("DNS caching disabled")
          return send_data("DISABLED\r\n")
        else
          send_data("NOTSWITCHED\r\n")
        end
      end

      # set_hilatency_warn_time <sec>
      # set to threshold of warn message into a log when hilatency occured in a command.
      def ev_set_hilatency_warn_time(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        if s[1].to_f <= 0
          return send_data("CLIENT_ERROR time value must be lager than 0\r\n")
        end

        res = broadcast_cmd("rset_hilatency_warn_time #{s[1]}\r\n")
        @stats.hilatency_warn_time = s[1].to_f
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      # rset_hilatency_warn_time <sec>
      def ev_rset_hilatency_warn_time(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        if s[1].to_f <= 0
          return send_data("CLIENT_ERROR time value must be lager than 0\r\n")
        end
        @stats.hilatency_warn_time = s[1].to_f
        send_data("STORED\r\n")
      end

      # set_routing_trans_timeout <sec>
      def ev_set_routing_trans_timeout(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        if s[1].to_f <= 0
          return send_data("CLIENT_ERROR time value must be lager than 0\r\n")
        end
        res = broadcast_cmd("rset_routing_trans_timeout #{s[1]}\r\n")
        @stats.routing_trans_timeout = s[1].to_f
        res[@stats.ap_str] = "STORED"

        send_data("#{res}\r\n")
      end

      # rset_set_routing_trans_timeout <sec>
      def ev_rset_routing_trans_timeout(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        if s[1].to_f <= 0
          return send_data("CLIENT_ERROR time value must be lager than 0\r\n")
        end
        @stats.routing_trans_timeout = s[1].to_f

        send_data("STORED\r\n")
      end

      # set_spushv_read_timeout <sec>
      def ev_set_spushv_read_timeout(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        if s[1].to_i <= 0
          return send_data("CLIENT_ERROR time value must be lager than 0\r\n")
        end
        res = broadcast_cmd("rset_spushv_read_timeout #{s[1]}\r\n")
        @stats.spushv_read_timeout = s[1].to_i
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      # rset_spushv_read_timeout <sec>
      def ev_rset_spushv_read_timeout(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        if s[1].to_i <= 0
          return send_data("CLIENT_ERROR time value must be lager than 0\r\n")
        end
        @stats.spushv_read_timeout = s[1].to_i
        send_data("STORED\r\n")
      end

      # set_reqpushv_timeout_count <sec>
      def ev_set_reqpushv_timeout_count(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        if s[1].to_i <= 0
          return send_data("CLIENT_ERROR time value must be lager than 0\r\n")
        end
        res = broadcast_cmd("rset_reqpushv_timeout_count #{s[1]}\r\n")
        @stats.reqpushv_timeout_count = s[1].to_i
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      # ev_rset_reqpushv_timeout_count <sec>
      def ev_rset_reqpushv_timeout_count(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        if s[1].to_i <= 0
          return send_data("CLIENT_ERROR time value must be lager than 0\r\n")
        end
        @stats.reqpushv_timeout_count = s[1].to_i
        send_data("STORED\r\n")
      end

      # set_spushv_klength_warn <byte>
      def ev_set_spushv_klength_warn(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        if s[1].to_i <= 0
          return send_data("CLIENT_ERROR size value must be larger than 0 \r\n")
        end
        res = broadcast_cmd("rset_spushv_klength_warn #{s[1]}\r\n")
        @stats.spushv_klength_warn = s[1].to_i
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      # rset_set_spushv_klength_warn <byte>
      def ev_rset_spushv_klength_warn(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        if s[1].to_i <= 0
          return send_data("CLIENT_ERROR size value must be larger than 0 \r\n")
        end
        @stats.spushv_klength_warn = s[1].to_i
        send_data("STORED\r\n")
      end

      # set_spushv_vlength_warn <byte>
      def ev_set_spushv_vlength_warn(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        if s[1].to_i <= 0
          return send_data("CLIENT_ERROR size value must be larger than 0 \r\n")
        end
        res = broadcast_cmd("rset_spushv_vlength_warn #{s[1]}\r\n")
        @stats.spushv_vlength_warn = s[1].to_i
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      # rset_set_spushv_vlength_warn <byte>
      def ev_rset_spushv_vlength_warn(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        if s[1].to_i <= 0
          return send_data("CLIENT_ERROR size value must be larger than 0\r\n")
        end
        @stats.spushv_vlength_warn = s[1].to_i
        send_data("STORED\r\n")
      end

      # wb_command_map <hash string>
      # ex.
      # {:set=>1,:append=>2,:delete=>3}
      def ev_wb_command_map(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        map = {}
        cmd = s[1..-1].join
        if cmd =~ /^\{(.+)\}$/
          $1.split(',').each do |kv|
            k, v = kv.split('=>')
            map[k[1..-1].to_sym] = v.to_i if v && k[0]==':'
          end

          res = broadcast_cmd("rwb_command_map #{s[1..-1].join}\r\n")
          @stats.wb_command_map = map
          res[@stats.ap_str] = map.inspect
          send_data("#{res}\r\n")
        else
          send_data("CLIENT_ERROR hash string parse error\r\n")
        end
      end

      def ev_rwb_command_map(s)
        if s.length < 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        end
        map = {}
        cmd = s[1..-1].join
        if cmd =~ /^\{(.+)\}$/
          $1.split(',').each do |kv|
            k, v = kv.split('=>')
            map[k[1..-1].to_sym] = v.to_i if v && k[0]==':'
          end
          @stats.wb_command_map = map
          send_data("#{map}\r\n")
        else
          send_data("CLIENT_ERROR hash string parse error\r\n")
        end
      end

      # set_wb_shift_size <size>
      def ev_set_wb_shift_size(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        elsif s[1].to_i < 1
          return send_data("CLIENT_ERROR length must be greater than zero\r\n")
        end

        res = broadcast_cmd("rset_wb_shift_size #{s[1]}\r\n")
        $roma.wb_writer.shift_size = s[1].to_i
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      def ev_rset_wb_shift_size(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        elsif s[1].to_i < 1
          return send_data("CLIENT_ERROR length must be greater than zero\r\n")
        end

        $roma.wb_writer.shift_size = s[1].to_i
        send_data("STORED\r\n")
      end

      # set_storage_status [number of file][safecopy|normal]{hash_name}
      def ev_set_storage_status(s)
        if s.length < 3
          return send_data("CLIENT_ERROR number of arguments (#{s.length - 1} for 2)\r\n")
        end

        if s.length >= 4
          hname = s[3]
        else
          hname = 'roma'
        end
        st = @storages[hname]
        unless st
          return send_data("CLIENT_ERROR hash_name = #{hanme} does not found\r\n")
        end
        dn = s[1].to_i
        if st.divnum <= dn
          return send_data("CLIENT_ERROR divnum <= #{dn}\r\n")
        end
        if s[2] == 'safecopy'
          if st.dbs[dn] != :normal
            return send_data("CLIENT_ERROR storage[#{dn}] != :normal status\r\n")
          end
          if st.set_db_stat(dn, :safecopy_flushing) == false
            return send_data("CLIENT_ERROR storage[#{dn}] status can't changed\r\n")
          end
          Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('start_storage_flush_process',[hname, dn]))
        elsif s[2] ==  'normal'
          if st.dbs[dn] != :safecopy_flushed
            return send_data("CLIENT_ERROR storage[#{dn}] != :safecopy_flushed status\r\n")
          end
          if st.set_db_stat(dn, :cachecleaning) == false
            return send_data("CLIENT_ERROR storage[#{dn}] status can't changed\r\n")
          end
          Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('start_storage_cachecleaning_process',[hname, dn]))
        else
          return send_data("CLIENT_ERROR status parse error\r\n")
        end

        send_data("PUSHED\r\n")
      end

      # set_gui_run_snapshot [true|false]
      def ev_set_gui_run_snapshot(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end

        case s[1]
        when 'true'
          @stats.gui_run_snapshot = true
          send_data("STORED\r\n")
        when 'false'
          @stats.gui_run_snapshot = false
          send_data("STORED\r\n")
        else
          return send_data("CLIENT_ERROR value must be true or false\r\n")
        end
      end

      # set_gui_last_snapshot_date [%Y/%m/%d %H:%M:%S]
      def ev_set_gui_last_snapshot(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        if s[1] !~ /^\d+\/\d+\/\d+T\d+:\d+:\d+$/
          return send_data("CLIENT_ERROR format is [%Y/%m/%dT%H:%M:%S]\r\n")
        end
        res = broadcast_cmd("rset_gui_last_snapshot #{s[1]}\r\n")
        @stats.gui_last_snapshot = s[1]
        res[@stats.ap_str] = "PUSHED"
        send_data("#{res}\r\n")
      end

      # rset_gui_last_snapshot(s)
      def ev_rset_gui_last_snapshot(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\n\r")
        end
        if s[1] !~ /^\d+\/\d+\/\d+T\d+:\d+:\d+$/
          return send_data("CLIENT_ERROR format is [%Y/%m/%dT%H:%M:%S]\r\n")
        end
        @stats.gui_last_snapshot = s[1]
        send_data("PUSHED\r\n")
      end

      # set_cleanup_regexp <regexp>
      def ev_set_cleanup_regexp(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments #{s.length-1} to 1\r\n")
        end

        # failover check
        unless @rttable.enabled_failover
          return send_data("CLIENT_ERROR failover disable now!!\r\n")
        end

        @storages.each{|hname,st|
          st.cleanup_regexp = s[1]
          st.stop_clean_up
          send_data("STORED\r\n")
        }
      end

      # set_log_shift_size <size>
      def ev_set_log_shift_size(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        elsif s[1].to_i < 1
          return send_data("CLIENT_ERROR length must be greater than zero\r\n")
        end

        res = broadcast_cmd("rset_log_shift_size #{s[1]}\r\n")
        @log.set_log_shift_size(s[1].to_i)
        @stats.log_shift_size = s[1].to_i
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      def ev_rset_log_shift_size(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        elsif s[1].to_i < 1
          return send_data("CLIENT_ERROR length must be greater than zero\r\n")
        end

        @log.set_log_shift_size(s[1].to_i)
        @stats.log_shift_size = s[1].to_i
        send_data("STORED\r\n")
      end

      # set_log_shift_age <age>
      def ev_set_log_shift_age(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        elsif s[1].to_i < 1 && !['0', 'min', 'hour', 'daily', 'weekly', 'monthly'].include?(s[1])
          return send_data("CLIENT_ERROR invalid arguments\r\n")
        end

        res = broadcast_cmd("rset_log_shift_age #{s[1]}\r\n")

        if s[1].to_i > 0 || s[1] == '0'
          @log.set_log_shift_age(s[1].to_i)
          @stats.log_shift_age = s[1].to_i
        else
          @log.set_log_shift_age(s[1])
          @stats.log_shift_age = s[1]
        end
        res[@stats.ap_str] = "STORED"
        send_data("#{res}\r\n")
      end

      def ev_rset_log_shift_age(s)
        if s.length != 2
          return send_data("CLIENT_ERROR number of arguments\r\n")
        elsif s[1].to_i < 1 && !['0', 'min', 'hour', 'daily', 'weekly', 'monthly'].include?(s[1])
          return send_data("CLIENT_ERROR invalid arguments\r\n")
        end

        if s[1].to_i > 0 || s[1] == '0'
          @log.set_log_shift_age(s[1].to_i)
          @stats.log_shift_age = s[1].to_i
        else
          @log.set_log_shift_age(s[1])
          @stats.log_shift_age = s[1]
        end
        send_data("STORED\r\n")
      end

      private

      def dcnice(p)
        case(p)
        when 1 # highest priority
          @stats.stream_copy_wait_param = 0.001
          @storages.each_value{|st|
            st.each_vn_dump_sleep = 0.001
            st.each_vn_dump_sleep_count = 1000
          }
        when 2
          @stats.stream_copy_wait_param = 0.005
          @storages.each_value{|st|
            st.each_vn_dump_sleep = 0.005
            st.each_vn_dump_sleep_count = 100
          }
        when 3 # default priority
          @stats.stream_copy_wait_param = 0.01
          @storages.each_value{|st|
            st.each_vn_dump_sleep = 0.001
            st.each_vn_dump_sleep_count = 10
          }
        when 4
          @stats.stream_copy_wait_param = 0.01
          @storages.each_value{|st|
            st.each_vn_dump_sleep = 0.005
            st.each_vn_dump_sleep_count = 10
          }
        when 5 # lowest priority
          @stats.stream_copy_wait_param = 0.01
          @storages.each_value{|st|
            st.each_vn_dump_sleep = 0.01
            st.each_vn_dump_sleep_count = 10
          }
        else
          return "CLIENT_ERROR You sholud input a priority from 1 to 5."
        end
        @stats.dcnice = p
        "STORED"
      end

      def get_config_stat
        ret = {}
        ret['config.DEFAULT_LOST_ACTION'] = Config::DEFAULT_LOST_ACTION
        ret['config.LOG_SHIFT_AGE'] = Config::LOG_SHIFT_AGE
        ret['config.LOG_SHIFT_SIZE'] = Config::LOG_SHIFT_SIZE
        ret['config.LOG_PATH'] = File.expand_path(Config::LOG_PATH)
        ret['config.RTTABLE_PATH'] = File.expand_path(Config::RTTABLE_PATH)
        ret['config.STORAGE_DELMARK_EXPTIME'] = Config::STORAGE_DELMARK_EXPTIME
        if Config.const_defined?(:STORAGE_EXCEPTION_ACTION)
          ret['config.STORAGE_EXCEPTION_ACTION'] = Config::STORAGE_EXCEPTION_ACTION
        end
        ret['config.DATACOPY_STREAM_COPY_WAIT_PARAM'] = Config::DATACOPY_STREAM_COPY_WAIT_PARAM
        ret['config.PLUGIN_FILES'] = Config::PLUGIN_FILES.inspect
        ret['config.WRITEBEHIND_PATH'] = File.expand_path(Config::WRITEBEHIND_PATH)
        ret['config.WRITEBEHIND_SHIFT_SIZE'] = Config::WRITEBEHIND_SHIFT_SIZE
        if Config.const_defined?(:CONNECTION_DESCRIPTOR_TABLE_SIZE)
          ret['config.CONNECTION_DESCRIPTOR_TABLE_SIZE'] = Config::CONNECTION_DESCRIPTOR_TABLE_SIZE
        end
        ret
      end

    end # module SystemCommandReceiver
  end # module Command
end # module Roma