cloudfoundry/health_manager

View on GitHub
lib/health_manager/droplet.rb

Summary

Maintainability
C
7 hrs
Test Coverage
require 'set'
require 'health_manager/app_instance'

module HealthManager
  #this class provides answers about droplet's State
  class Droplet
    include HealthManager::Common

    attr_reader :id, :state, :live_version, :num_instances, :package_state, :last_updated, :crashes, :extra_instances
    attr_accessor :desired_state_update_required

    def initialize(id)
      @id = id.to_s
      @num_instances = 0
      @versions = {}
      @crashes = {}
      @extra_instances = {}
      reset_missing_indices

      # start out as stale until desired state is set
      @desired_state_update_required = true
      @desired_state_update_timestamp = now
    end

    def ripe_for_gc?
      timestamp_older_than?(@desired_state_update_timestamp, interval(:droplet_gc_grace_period))
    end

    def set_desired_state(desired_droplet)
      logger.debug("bulk: #set_desired_state", { actual: { instances: all_instances_report }, desired_droplet: desired_droplet })

      %w[state instances version package_state updated_at].each do |k|
        unless desired_droplet[k]
          raise ArgumentError, "Value #{k} is required, missing from #{desired_droplet}"
        end
      end

      @num_instances = desired_droplet['instances']
      @state = desired_droplet['state']
      @live_version = desired_droplet['version']
      @package_state = desired_droplet['package_state']
      @last_updated = parse_utc(desired_droplet['updated_at'])

      @desired_state_update_required = false
      @desired_state_update_timestamp = now
    end

    def to_json(*a)
      encode_json(self.instance_variables.inject({}) do |h, v|
        h[v[1..-1]] = self.instance_variable_get(v); h
      end)
    end

    def pending_restarts
      (0..num_instances).map do |i|
        instance = get_instance(i)
        instance.pending_restart? ? instance : nil
      end.compact
    end

    def process_heartbeat(beat)
      @extra_instances.clear
      instance = get_instance(beat.index, beat.version)
      instance.receive_heartbeat(beat)
      instance_guid_to_prune = instance.extra_instance_guid_to_prune
      if instance_guid_to_prune
        @extra_instances[instance_guid_to_prune] = {
          version: beat.version,
          reason: "Instance mismatch, pruning: #{instance_guid_to_prune}"
        }
      end

      if beat.state == CRASHED
        @crashes[beat.instance_guid] = {
          'timestamp' => now,
          'crash_timestamp' => beat.state_timestamp
        }
      end
    end

    def has_missing_indices?
      # TODO: add test
      !reset_recently? && !missing_indices.empty?
    end

    def update_extra_instances
      @extra_instances.clear

      num_running = 0

      # first, go through each version and prune indices
      versions.each do |version, version_entry|
        version_entry['instances'].delete_if do |_, instance|  # deleting extra instances
          if instance.starting_or_running? && !instance.has_recent_heartbeat?
            instance.down!
          end

          prune_reason =
            if state == STOPPED
              "Droplet state is STOPPED"
            elsif num_running >= num_instances
              "Extra instance"
            elsif version != live_version
              "Live version mismatch"
            end

          if prune_reason
            logger.debug1 { "pruning: #{prune_reason}" }
            if instance.starting_or_running?
              @extra_instances[instance.guid] = {
                version: version,
                reason: prune_reason
              }
            end

            true
          else
            num_running += 1

            false
          end
        end
      end

      delete_versions_without_instances
    end

    def reset_missing_indices
      @reset_timestamp = now
    end

    def missing_indices
      return [] unless [
                        @state == STARTED,
                        @package_state == STAGED
                        # possibly add other sanity checks here to ensure valid running state,
                        # e.g. valid version, etc.
                       ].all?
      (0...num_instances).find_all do |i|
        instance = get_instance(i)
        instance.missing?
      end
    end

    def prune_crashes
      @crashes.delete_if { |_, crash|
        timestamp_older_than?(crash['timestamp'], interval(:flapping_timeout))
      }
    end

    def num_instances= val
      @num_instances = val
      reset_missing_indices
      @num_instances
    end

    def reset_recently?
      timestamp_fresher_than?(@reset_timestamp, interval(:droplet_lost) || 0)
    end

    def desired_state_update_required?
      @desired_state_update_required
    end

    def process_exit_crash(message)
      instance = get_instance(message.fetch(:index), message.fetch(:version), message.fetch(:instance))
      instance.crash!(message.fetch(:crash_timestamp))

      if instance.guid != message.fetch(:instance)
        logger.warn { "unexpected instance_id: #{message.fetch(:instance)}, desired: #{instance.guid}" }
      end

      @crashes[instance.guid] = {
        'timestamp' => now,
        'crash_timestamp' => instance.last_crash_timestamp
      }
    end

    def mark_instance_as_down(version, index, instance_id)
      get_instance(index, version).mark_as_down_for_guid(instance_id)
    end

    def all_starting_or_running_instances
      versions.inject([]) do |memo, (version, _)|
        get_instances(version).each { |_, instance| memo << instance if instance.starting_or_running?}
        memo
      end
    end

    def get_instances(version = @live_version)
      get_version(version)['instances']
    end

    def get_instance(index, version = @live_version, instance_guid = nil)
      get_instances(version)[index] ||= AppInstance.new(version, index, instance_guid)
    end

    def safe_get_instance(index, version = @live_version)
      get_instances(version)[index]
    end

    def update_realtime_varz(varz)
      varz[:total_apps] += 1
      varz[:total_instances] += num_instances
      varz[:crashed_instances] += crashes.size

      if state == STARTED
        varz[:running][:apps] += 1

        num_instances.times do |index|
          instance = get_instance(index)
          case instance.state
            when STARTING, RUNNING
              varz[:running_instances] += 1
              varz[:running][:running_instances] += 1
            when DOWN
              varz[:missing_instances] += 1
              varz[:running][:missing_instances] += 1
            when FLAPPING
              varz[:flapping_instances] += 1
              varz[:running][:flapping_instances] += 1
          end
        end

        varz[:running][:crashes] += crashes.size
      end
    end

    def is_extra?
      desired_state_update_overdue?
    end

    def number_of_running_instances_by_version
      versions.inject({}) do |memo, (version, version_entry)|
        memo[version] = version_entry["instances"].inject(0) { |memo, (_, instance)| memo + instance.running_guid_count }
        memo
      end
    end

    private
    attr_reader :versions

    def delete_versions_without_instances
      versions.delete_if do |version, version_entry|
        if version_entry['instances'].empty?
          @state == STOPPED || version != @live_version
        end
      end
    end

    def desired_state_update_overdue?
      timestamp_older_than?(@desired_state_update_timestamp, interval(:desired_state_lost),)
    end

    def timestamp_fresher_than?(timestamp, age)
      timestamp > 0 && now - timestamp < age
    end

    def timestamp_older_than?(timestamp, age)
      timestamp > 0 && (now - timestamp) > age
    end

    def parse_utc(time)
      Time.parse(time).to_i
    end

    def get_version(version = @live_version)
      versions[version] ||= {'instances' => {}}
    end

    def all_instances_report
      versions.inject([]) do |memo, (version, _)|
        get_instances(version).each { |_, instance| memo << {state: instance.state, version: instance.version, guid: instance.guid, index: instance.index, crash_count: instance.crash_count} }
        memo
      end
    end
  end
end