ManageIQ/kubeclient

View on GitHub
lib/kubeclient/informer.rb

Summary

Maintainability
A
1 hr
Test Coverage
module Kubeclient
  # caches results for multiple consumers to share and keeps them updated with a watch
  class Informer
    def initialize(client, resource_name, options: {}, reconcile_timeout: 15 * 60, logger: nil)
      @client = client
      @resource_name = resource_name
      @reconcile_timeout = reconcile_timeout
      @logger = logger
      @cache = nil
      @started = nil
      @stopped = false
      @watching = []
      @options = options
    end

    def list
      @cache.values
    end

    def watch(&block)
      with_watching(&block)
    end

    # not implicit so users know they have to `stop`
    def start_worker
      @stopped = false
      @worker = Thread.new do
        until @stopped
          begin
            fill_cache
            stop_reason = watch_to_update_cache
            @logger&.info("watch restarted: #{stop_reason}")
          rescue StandardError => e
            # need to keep retrying since we work in the background
            @logger&.error("ignoring error during background work #{e}")
          ensure
            sleep(1) # do not overwhelm the api-server if we are somehow broken
          end
        end
      end
      sleep(0.01) until @cache || @stopped
    end

    def stop_worker
      @stopped = true # mark that any threads should be stopped if it is running
      pass_and_run(@worker) # skip the end loop sleep
      @worker.join
    end

    private

    def with_watching
      queue = Queue.new
      @watching << queue
      loop do
        x = queue.pop
        yield(x)
      end
    ensure
      @watching.delete(queue)
    end

    def cache_key(resource)
      resource.dig(:metadata, :uid)
    end

    def fill_cache
      get_options = @options.merge(raw: true, resource_version: '0')
      reply = @client.get_entities(nil, @resource_name, get_options)
      @cache = reply[:items].each_with_object({}) do |item, h|
        h[cache_key(item)] = item
      end
      @started = reply.dig(:metadata, :resourceVersion)
    end

    def watch_to_update_cache
      watcher_with_timeout do |watcher|
        stop_reason = 'disconnect'

        watcher.each do |notice|
          case notice[:type]
          when 'ADDED', 'MODIFIED' then @cache[cache_key(notice[:object])] = notice[:object]
          when 'DELETED' then @cache.delete(cache_key(notice[:object]))
          when 'ERROR'
            stop_reason = 'error'
            break
          else
            @logger&.error("Unsupported event type #{notice[:type]}")
          end
          @watching.each { |q| q << notice }
        end

        stop_reason
      end
    end

    def watcher_with_timeout
      watch_options = @options.merge(watch: true, resource_version: @started)
      @watcher = @client.watch_entities(@resource_name, watch_options)
      timeout_deadline = Time.now + @reconcile_timeout
      watcher_finished = false

      finisher_thread = Thread.new do
        sleep(0.5) until @stopped || watcher_finished || Time.now > timeout_deadline
        # loop calling finish until the actual method has
        # exited, since watcher.each may be called after the
        # finish in this thread is called
        loop do
          @watcher.finish
          break if watcher_finished
          sleep(0.5)
        end
      end

      stop_reason = yield(@watcher)
      Time.now > timeout_deadline ? 'reconcile' : stop_reason # return the reason
    ensure
      watcher_finished = true
      pass_and_run(finisher_thread) # skip the sleep to evaluate exit condition
      finisher_thread.join
    end

    def pass_and_run(thread)
      Thread.pass
      thread.run
    rescue ThreadError
      # thread was already dead
    end
  end
end