ManageIQ/manageiq

View on GitHub
tools/pg_inspector/active_connections_to_human.rb

Summary

Maintainability
A
30 mins
Test Coverage
F
29%
require 'optimist'
require 'yaml'
require 'time'
begin
  require 'activerecord-id_regions'
rescue LoadError
  $old_version = true
end
require 'pg_inspector/error'
require 'pg_inspector/pg_inspector_operation'
require 'pg_inspector/util'

module PgInspector
  class ActiveConnectionsHumanYAML < PgInspectorOperation
    COMPRESSED_ID_SEPARATOR = 'r'.freeze
    DEFAULT_RAILS_SEQUENCE_FACTOR = 1_000_000_000_000
    RE_COMPRESSED_ID = /^(\d+)#{COMPRESSED_ID_SEPARATOR}(\d+)$/

    HELP_MSG_SHORT = "Dump active connections to human readable YAML file".freeze
    def parse_options(args)
      self.options = Optimist.options(args) do
        opt(:connections, "Active connections file",
            :type => :string, :short => "c", :default => DEFAULT_OUTPUT_PATH.join("#{PREFIX}active_connections.yml").to_s)
        opt(:servers, "Servers information file",
            :type => :string, :short => "s", :default => DEFAULT_OUTPUT_PATH.join("#{PREFIX}server.yml").to_s)
        opt(:output, "Output file",
            :type => :string, :short => "o", :default => DEFAULT_OUTPUT_PATH.join("#{PREFIX}human.yml").to_s)
      end
    end

    def run
      Util.dump_to_yml_file(
        merge_activity_and_server_info(
          process_stat_activity_file(options[:connections]),
          process_miq_info_file(options[:servers])
        ), "Human readable active connections", options[:output]
      )
    end

    private

    def process_stat_activity_file(file_name)
      stat_activities = YAML.load_file(file_name)
      result = {
        "servers"         => [],
        "workers"         => [],
        "connections"     => [],
        "other_processes" => []
      }
      stat_activities.each do |activity|
        if server_activity?(activity)
          current_activity = process_miq_activity(activity)
          push_new_server(result, current_activity)
          push_connection(result, current_activity)
        elsif worker_activity?(activity)
          current_activity = process_miq_activity(activity)
          push_new_worker(result, current_activity)
          push_connection(result, current_activity)
        else
          push_other_process(result, process_other_process(activity))
        end
      end
      result
    rescue => e
      Util.error_exit(e)
    end

    def push_new_server(result, activity)
      result["servers"].each do |server|
        if activity["server_id"] == server["server_id"]
          return result
        end
      end
      result["servers"] << filter_activity_for_server(activity)
      result
    end

    def filter_activity_for_server(activity)
      activity.select do |k, _v|
        %w[server_id
           name
           pid
           zone_id
           zone_name]
          .include?(k)
      end
    end

    def push_new_worker(result, activity)
      result["workers"].each do |worker|
        if activity["worker_id"] == worker["worker_id"]
          return result
        end
      end
      result["workers"] << filter_activity_for_worker(activity)
      result
    end

    def filter_activity_for_worker(activity)
      activity.select do |k, _v|
        %w[worker_id
           server_id
           class_name
           pid]
          .include?(k)
      end
    end

    def push_connection(result, activity)
      result["connections"] << filter_activity_for_connection(activity)
      result
    end

    def filter_activity_for_connection(activity)
      activity.select do |k, _v|
        %w[worker_id
           server_id
           datid
           datname
           spid
           usesysid
           usename
           client_port
           backend_start
           xact_start
           query_start
           state_change
           waiting
           state
           backend_xid
           backend_xmin
           query]
          .include?(k)
      end
    end

    def process_other_process(activity)
      process_activity_shared(activity)
    end

    def push_other_process(result, activity)
      result["other_processes"] << activity
      result
    end

    def old_application_name?(activity)
      activity["application_name"].start_with?("MIQ ")
    end

    def new_application_name?(activity)
      activity["application_name"].start_with?("MIQ|")
    end

    def server_activity?(activity)
      if new_application_name?(activity) && activity["application_name"].split("|")[3] == "-"
        return true
      elsif old_application_name?(activity) && activity["application_name"].include?(" Server")
        return true
      end

      false
    end

    def worker_activity?(activity)
      if new_application_name?(activity) && activity["application_name"].split("|")[3] != "-"
        return true
      elsif old_application_name?(activity) && !activity["application_name"].include?(" Server")
        return true
      end

      false
    end

    def process_activity_shared(activity)
      activity["datid"] = activity["datid"].to_i
      activity["spid"] = activity["pid"].to_i
      activity.delete("pid")
      activity["usesysid"] = activity["usesysid"].to_i
      activity["client_port"] = activity["client_port"].to_i
      activity["backend_xid"] = activity["backend_xid"].to_i if activity["backend_xid"]
      activity["backend_xmin"] = activity["backend_xmin"].to_i if activity["backend_xmin"]
      activity["backend_start"] = to_utc(activity["backend_start"])
      activity["xact_start"] = to_utc(activity["xact_start"])
      activity["query_start"] = to_utc(activity["query_start"])
      activity["state_change"] = to_utc(activity["state_change"])
      activity
    end

    def process_miq_activity(activity)
      process_activity_shared(activity)
      process_miq_activity_application_name(activity)
    end

    def to_utc(time_str)
      Time.parse(time_str).utc.to_s if time_str
    end

    def process_miq_activity_application_name(activity)
      # Previous format, before https://github.com/ManageIQ/manageiq/pull/15545
      # For server:
      # MIQ <pid> Server[<server_compressed_id>], <zone.name>[<zone.compressed_id>]
      # For worker:
      # MIQ <pid> <minimal_worker_class_name>[<worker_compressed_id>], s[<server_compressed_id>], <zone.name>[<zone.compressed_id>]
      # Current :application_name format:
      # MIQ|<pid>|<server_id>|<worker_id>|<zone_id>|<class_name>|<zone_name>
      # Both previous and current is truncated up to 64 characters
      if activity["application_name"].end_with?("...")
        $stderr.puts("Warning: the application_name #{activity["application_name"]} is incomplete.")
      end
      if new_application_name?(activity)
        _, pid, server_id, worker_id, zone_id, class_name, zone_name = activity["application_name"].split("|")
      elsif activity["application_name"].include?(" Server")
        # old application name, server activity
        _, pid, class_name, server_id, zone_name, zone_id = activity["application_name"].split(/[, \[\]]+/)
        worker_id = "-"
      else
        # old application name, worker activity
        _, pid, class_name, worker_id, _, server_id, zone_name, zone_id = activity["application_name"].split(/[, \[\]]+/)
      end
      activity["pid"] = pid.to_i
      activity["class_name"] = class_name
      activity["server_id"] = uncompress_id(server_id)
      activity["worker_id"] = uncompress_id(worker_id)
      activity["zone_id"] = uncompress_id(zone_id)
      activity["zone_name"] = zone_name
      activity
    end

    def process_miq_info_file(file_name)
      servers = YAML.load_file(file_name)
      result = []
      servers.each { |server| result << process_miq_server(server) }
      result
    rescue => e
      Util.error_exit(e)
    end

    def process_miq_server(server)
      server["server_id"] = server["id"].to_i
      server.select do |k, _v|
        %w[server_id
           hostname
           ipaddress]
          .include?(k)
      end
    end

    def rails_sequence_factor
      DEFAULT_RAILS_SEQUENCE_FACTOR
    end

    def id_to_region(id)
      id.to_i / rails_sequence_factor
    end

    def split_id(id)
      id = uncompress_id(id)

      region_number = id_to_region(id)
      short_id      = region_number.zero? ? id : id % (region_number * rails_sequence_factor)

      return region_number, short_id
    end

    def compress_id(id)
      if $old_version
        return nil if id.nil?

        region_number, short_id = split_id(id)
        region_number.zero? ? short_id.to_s : "#{region_number}#{COMPRESSED_ID_SEPARATOR}#{short_id}"
      else
        Class.new.include(ActiveRecord::IdRegions).compress_id(id)
      end
    end

    def uncompress_id(id)
      if $old_version
        return nil if id.nil?

        id.to_s =~ RE_COMPRESSED_ID ? ($1.to_i * rails_sequence_factor + $2.to_i) : id.to_i
      else
        Class.new.include(ActiveRecord::IdRegions).uncompress_id(id)
      end
    end

    def merge_activity_and_server_info(stat_activities, servers)
      server_activities = array_of_hash_to_hash(
        stat_activities["servers"], "server_id"
      )
      servers.each do |server|
        server_activities[server["server_id"]] =
          server_activities.fetch(server["server_id"], {}).merge(server)
      end
      stat_activities["servers"] = hash_val_array(server_activities)
      stat_activities
    end

    def array_of_hash_to_hash(array, key)
      result = {}
      array.each do |e|
        result[e[key]] = e
      end
      result
    end

    def hash_val_array(hash)
      result = []
      hash.each do |_k, v|
        result << v
      end
      result
    end
  end
end