karafka/karafka

View on GitHub
lib/karafka/connection/manager.rb

Summary

Maintainability
A
25 mins
Test Coverage
# frozen_string_literal: true

module Karafka
  # Namespace for Kafka connection related logic
  module Connection
    # Connections manager responsible for starting and managing listeners connections
    #
    # In the OSS version it starts listeners as they are without any connection management or
    # resources utilization supervision and shuts them down or quiets  when time has come
    class Manager
      def initialize
        @once_executions = Set.new
      end

      # Registers provided listeners and starts all of them
      #
      # @param listeners [Connection::ListenersBatch]
      def register(listeners)
        @listeners = listeners
        @listeners.each(&:start!)
      end

      # @return [Boolean] true if all listeners are stopped
      def done?
        @listeners.all?(&:stopped?)
      end

      # Controls the state of listeners upon shutdown and quiet requests
      # In both cases (quieting and shutdown) we first need to stop processing more work and tell
      # listeners to become quiet (connected but not yielding messages) and then depending on
      # whether we want to stop fully or just keep quiet we apply different flow.
      #
      # @note It is important to ensure, that all listeners from the same consumer group are always
      #   all quiet before we can fully shutdown given consumer group. Skipping this can cause
      #   `Timed out LeaveGroupRequest in flight` and other errors. For the simplification, we just
      #   quiet all and only then move forward.
      #
      # @note This manager works with the assumption, that all listeners are executed on register.
      def control
        # Do nothing until shutdown or quiet
        return unless Karafka::App.done?

        # When we are done processing, immediately quiet all the listeners so they do not pick up
        # new work to do
        once(:quiet!) { @listeners.each(&:quiet!) }

        return unless @listeners.all?(&:quiet?)

        # If we are in the process of moving to quiet state, we need to check it.
        # Switch to quieted status only when all listeners are fully quieted and do nothing after
        # that until further state changes
        once(:quieted!) { Karafka::App.quieted! } if Karafka::App.quieting?

        return if Karafka::App.quiet?

        once(:stop!) { @listeners.each(&:stop!) }
      end

      private

      # Runs code only once and never again
      # @param args [Object] anything we want to use as a set of unique keys for given execution
      def once(*args)
        return if @once_executions.include?(args)

        @once_executions << args

        yield
      end
    end
  end
end