karafka/karafka

View on GitHub
lib/karafka/connection/listener.rb

Summary

Maintainability
C
7 hrs
Test Coverage
# frozen_string_literal: true

module Karafka
  module Connection
    # A single listener that listens to incoming messages from a single subscription group.
    # It polls the messages and then enqueues jobs. It also takes care of potential recovery from
    # critical errors by restarting everything in a safe manner.
    #
    # This is the heart of the consumption process.
    #
    # It provides async API for managing, so all status changes are expected to be async.
    class Listener
      include Helpers::Async

      # Can be useful for logging
      # @return [String] id of this listener
      attr_reader :id

      # @return [Karafka::Routing::SubscriptionGroup] subscription group that this listener handles
      attr_reader :subscription_group

      # How long to wait in the initial events poll. Increases chances of having the initial events
      # immediately available
      INITIAL_EVENTS_POLL_TIMEOUT = 100

      private_constant :INITIAL_EVENTS_POLL_TIMEOUT

      # @param subscription_group [Karafka::Routing::SubscriptionGroup]
      # @param jobs_queue [Karafka::Processing::JobsQueue] queue where we should push work
      # @param scheduler [Karafka::Processing::Scheduler] scheduler we want to use
      # @return [Karafka::Connection::Listener] listener instance
      def initialize(subscription_group, jobs_queue, scheduler)
        proc_config = ::Karafka::App.config.internal.processing

        @id = SecureRandom.hex(6)
        @subscription_group = subscription_group
        @jobs_queue = jobs_queue
        @coordinators = Processing::CoordinatorsBuffer.new(subscription_group.topics)
        @client = Client.new(@subscription_group, -> { running? })
        @executors = Processing::ExecutorsBuffer.new(@client, subscription_group)
        @jobs_builder = proc_config.jobs_builder
        @partitioner = proc_config.partitioner_class.new(subscription_group)
        @scheduler = scheduler
        @events_poller = Helpers::IntervalRunner.new { @client.events_poll }
        # We keep one buffer for messages to preserve memory and not allocate extra objects
        # We can do this that way because we always first schedule jobs using messages before we
        # fetch another batch.
        @messages_buffer = MessagesBuffer.new(subscription_group)
        @mutex = Mutex.new
        @status = Status.new

        @jobs_queue.register(@subscription_group.id)

        # This makes sure that even if we tick more often than the interval time due to frequent
        # unlocks from short-lived jobs or async queues synchronization, events handling and jobs
        # scheduling still happens with the expected frequency
        @interval_runner = Helpers::IntervalRunner.new do
          @events_poller.call
          @scheduler.on_manage
        end
      end

      # Runs the main listener fetch loop.
      #
      # @note Prefetch callbacks can be used to seek offset or do other things before we actually
      #   start consuming data
      def call
        Karafka.monitor.instrument(
          'connection.listener.before_fetch_loop',
          caller: self,
          client: @client,
          subscription_group: @subscription_group
        )

        fetch_loop

        Karafka.monitor.instrument(
          'connection.listener.after_fetch_loop',
          caller: self,
          client: @client,
          subscription_group: @subscription_group
        )
      end

      # Aliases all statuses operations directly on the listener so we have a listener-facing API
      Status::STATES.each do |state, transition|
        # @return [Boolean] is the listener in a given state
        define_method "#{state}?" do
          @status.public_send("#{state}?")
        end

        # Moves listener to a given state
        define_method transition do
          @status.public_send(transition)
        end
      end

      # @return [Boolean] is this listener active (not stopped and not pending)
      def active?
        @status.active?
      end

      # We overwrite the state `#start` because on start we need to also start running listener in
      # the async thread. While other state transitions happen automatically and status state
      # change is enough, here we need to run the background threads
      def start!
        if stopped?
          @client.reset
          @status.reset!
        end

        @status.start!

        async_call("karafka.listener##{@subscription_group.id}")
      end

      # Stops the jobs queue, triggers shutdown on all the executors (sync), commits offsets and
      # stops kafka client.
      #
      # @note This method is not private despite being part of the fetch loop because in case of
      #   a forceful shutdown, it may be invoked from a separate thread
      #
      # @note We wrap it with a mutex exactly because of the above case of forceful shutdown
      def shutdown
        @mutex.synchronize do
          return if stopped?
          # Nothing to clear if it was not even running
          return stopped! if pending?

          @executors.clear
          @coordinators.reset
          @client.stop

          stopped!
        end
      end

      private

      # Fetches the data and adds it to the jobs queue.
      #
      # @note We catch all the errors here, so they don't affect other listeners (or this one)
      #   so we will be able to listen and consume other incoming messages.
      #   Since it is run inside Karafka::Connection::Runner thread - catching all the exceptions
      #   won't crash the whole process. Here we mostly focus on catching the exceptions related to
      #   Kafka connections / Internet connection issues / Etc. Business logic problems should not
      #   propagate this far.
      def fetch_loop
        running!
        # Run the initial events fetch to improve chances of having metrics and initial callbacks
        # triggers on start.
        #
        # In theory this may slow down the initial boot but we limit it up to 100ms, so it should
        # not have a big initial impact. It may not be enough but Karafka does not give the boot
        # warranties of statistics or other callbacks being immediately available, hence this is
        # a fair trade-off
        @client.events_poll(INITIAL_EVENTS_POLL_TIMEOUT)

        # Run the main loop as long as we are not stopping or moving into quiet mode
        while running?
          Karafka.monitor.instrument(
            'connection.listener.fetch_loop',
            caller: self,
            client: @client,
            subscription_group: @subscription_group
          )

          resume_paused_partitions

          Karafka.monitor.instrument(
            'connection.listener.fetch_loop.received',
            caller: self,
            client: @client,
            subscription_group: @subscription_group,
            messages_buffer: @messages_buffer
          ) do
            # We need to fetch data before we revoke lost partitions details as during the polling
            # the callbacks for tracking lost partitions are triggered. Otherwise we would be
            # always one batch behind.
            poll_and_remap_messages
          end

          # If there were revoked partitions, we need to wait on their jobs to finish before
          # distributing consuming jobs as upon revoking, we might get assigned to the same
          # partitions, thus getting their jobs. The revoking jobs need to finish before
          # appropriate consumers are taken down and re-created
          build_and_schedule_revoked_jobs_for_revoked_partitions

          # We wait only on jobs from our subscription group. Other groups are independent.
          # This will block on revoked jobs until they are finished. Those are not meant to last
          # long and should not have any bigger impact on the system. Doing this in a blocking way
          # simplifies the overall design and prevents from race conditions
          wait

          build_and_schedule_flow_jobs

          # periodic jobs never run on topics and partitions that were scheduled, so no risk in
          # having collective wait after both
          build_and_schedule_periodic_jobs if Karafka.pro?

          wait
        end

        # If we are stopping we will no longer schedule any regular jobs despite polling.
        # We need to keep polling not to exceed the `max.poll.interval` for long-running
        # non-blocking jobs and we need to allow them to finish. We however do not want to
        # enqueue any new jobs. It's worth keeping in mind that it is the end user responsibility
        # to detect shutdown in their long-running logic or else Karafka will force shutdown
        # after a while.
        #
        # We do not care about resuming any partitions or lost jobs as we do not plan to do
        # anything with them as we're in the shutdown phase.
        #
        # What we do care however is the ability to still run revocation jobs in case anything
        # would change in the cluster. We still want to notify the long-running jobs about changes
        # that occurred in the cluster.
        wait_pinging(
          wait_until: -> { @jobs_queue.empty?(@subscription_group.id) },
          after_ping: -> { build_and_schedule_revoked_jobs_for_revoked_partitions }
        )

        # We do not want to schedule the shutdown jobs prior to finishing all the jobs
        # (including non-blocking) as there might be a long-running job with a shutdown and then
        # we would run two jobs in parallel for the same executor and consumer. We do not want that
        # as it could create a race-condition.
        build_and_schedule_shutdown_jobs

        # Wait until all the shutdown jobs are done
        wait_pinging(wait_until: -> { @jobs_queue.empty?(@subscription_group.id) })

        quieted!

        # Wait if we're in the process of finishing started work or finished all the work and
        # just sitting and being quiet
        wait_pinging(wait_until: -> { !quiet? })

        # This extra ping will make sure we've refreshed the rebalance state after other instances
        # potentially shutdown. This will prevent us from closing with a dangling callback
        @client.ping

        shutdown

        # This is on purpose - see the notes for this method
        # rubocop:disable Lint/RescueException
      rescue Exception => e
        # rubocop:enable Lint/RescueException
        Karafka.monitor.instrument(
          'error.occurred',
          caller: self,
          error: e,
          type: 'connection.listener.fetch_loop.error'
        )

        reset

        sleep(1) && retry
      end

      # Resumes processing of partitions that were paused due to an error.
      def resume_paused_partitions
        @coordinators.resume do |topic, partition|
          @client.resume(topic.name, partition)
        end
      end

      # Polls messages within the time and amount boundaries defined in the settings and then
      # builds karafka messages based on the raw rdkafka messages buffer returned by the
      # `#batch_poll` method.
      #
      # @note There are two buffers, one for raw messages and one for "built" karafka messages
      def poll_and_remap_messages
        @messages_buffer.remap(
          @client.batch_poll
        )
      end

      # Enqueues revoking jobs for partitions that were taken away from the running process.
      def build_and_schedule_revoked_jobs_for_revoked_partitions
        revoked_partitions = @client.rebalance_manager.revoked_partitions

        # Stop early to save on some execution and array allocation
        return if revoked_partitions.empty?

        jobs = []

        revoked_partitions.each do |topic, partitions|
          partitions.each do |partition|
            @coordinators.revoke(topic, partition)

            # There may be a case where we have lost partition of which data we have never
            # processed (if it was assigned and revoked really fast), thus we may not have it
            # here. In cases like this, we do not run a revocation job
            @executors.find_all(topic, partition).each do |executor|
              executor.coordinator.increment(:revoked)
              jobs << @jobs_builder.revoked(executor)
            end

            # We need to remove all the executors of a given topic partition that we have lost, so
            # next time we pick up it's work, new executors kick in. This may be needed especially
            # for LRJ where we could end up with a race condition
            # This revocation needs to happen after the jobs are scheduled, otherwise they would
            # be scheduled with new executors instead of old
            @executors.revoke(topic, partition)
          end
        end

        return if jobs.empty?

        jobs.each(&:before_schedule)
        @scheduler.on_schedule_revocation(jobs)
      end

      # Enqueues the shutdown jobs for all the executors that exist in our subscription group
      def build_and_schedule_shutdown_jobs
        jobs = []

        @executors.each do |executor|
          executor.coordinator.increment(:shutdown)
          job = @jobs_builder.shutdown(executor)
          jobs << job
        end

        return if jobs.empty?

        jobs.each(&:before_schedule)
        @scheduler.on_schedule_shutdown(jobs)
      end

      # Takes the messages per topic partition and enqueues processing jobs in threads using
      # given scheduler. It also handles the idle jobs when filtering API removed all messages
      # and we need to run house-keeping
      def build_and_schedule_flow_jobs
        return if @messages_buffer.empty?

        consume_jobs = []
        idle_jobs = []

        @messages_buffer.each do |topic, partition, messages|
          coordinator = @coordinators.find_or_create(topic, partition)
          # Start work coordination for this topic partition
          coordinator.start(messages)

          # We do not increment coordinator for idle job because it's not a user related one
          # and it will not go through a standard lifecycle. Same applies to revoked and shutdown
          if messages.empty?
            coordinator.increment(:idle)
            executor = @executors.find_or_create(topic, partition, 0, coordinator)
            idle_jobs << @jobs_builder.idle(executor)
          else
            @partitioner.call(topic, messages, coordinator) do |group_id, partition_messages|
              coordinator.increment(:consume)
              executor = @executors.find_or_create(topic, partition, group_id, coordinator)
              consume_jobs << @jobs_builder.consume(executor, partition_messages)
            end
          end
        end

        # We schedule the idle jobs before running the `#before_schedule` on the consume jobs so
        # workers can already pick up the idle jobs while the `#before_schedule` on consumption
        # jobs runs
        unless idle_jobs.empty?
          idle_jobs.each(&:before_schedule)
          @scheduler.on_schedule_idle(idle_jobs)
        end

        unless consume_jobs.empty?
          consume_jobs.each(&:before_schedule)
          @scheduler.on_schedule_consumption(consume_jobs)
        end
      end

      # Builds and schedules periodic jobs for topics partitions for which no messages were
      # received recently. In case `Idle` job is invoked, we do not run periodic. Idle means that
      # a complex flow kicked in and it was a user choice not to run consumption but messages were
      # shipped.
      def build_and_schedule_periodic_jobs
        # Shortcut if periodic jobs are not used at all. No need to run the complex flow when it
        # will never end up with anything. If periodics on any of the topics are not even defined,
        # we can finish fast
        @periodic_jobs ||= @subscription_group.topics.count(&:periodic_job?)

        return if @periodic_jobs.zero?

        jobs = []

        # We select only currently assigned topics and partitions from the current subscription
        # group as only those are of our interest. We then filter that to only pick those for whom
        # we want to run periodic jobs and then we select only those that did not receive any
        # messages recently. This ensures, that we do not tick close to recent arrival of messages
        # but rather after certain period of inactivity
        Karafka::App.assignments.each do |topic, partitions|
          # Skip for assignments not from our subscription group
          next unless topic.subscription_group == @subscription_group
          # Skip if this topic does not have periodic jobs enabled
          next unless topic.periodic_job?

          topic_name = topic.name
          interval = topic.periodic_job.interval

          partitions.each do |partition|
            coordinator = @coordinators.find_or_create(topic_name, partition)

            # Skip if we were operating on a given topic partition recently
            next if coordinator.active_within?(interval)

            # Do not tick if we do not want to tick during pauses
            next if coordinator.paused? && !topic.periodic_job.during_pause?

            # If we do not want to run periodics during retry flows, we should not
            # Since this counter is incremented before processing, here it is always -1 from what
            # we see in the consumer flow. This is why attempt 0 means that we will have first
            # run (ok) but attempt 1 means, there was an error and we will retry
            next if coordinator.attempt.positive? && !topic.periodic_job.during_retry?

            @executors.find_all_or_create(topic_name, partition, coordinator).each do |executor|
              coordinator.increment(:periodic)
              jobs << @jobs_builder.periodic(executor)
            end
          end
        end

        return if jobs.empty?

        jobs.each(&:before_schedule)
        @scheduler.on_schedule_periodic(jobs)
      end

      # Waits for all the jobs from a given subscription group to finish before moving forward
      def wait
        @jobs_queue.wait(@subscription_group.id) do
          @interval_runner.call
        end
      end

      # Waits without blocking the polling
      #
      # This should be used only when we no longer plan to use any incoming messages data and we
      # can safely discard it. We can however use the rebalance information if needed.
      #
      # @param wait_until [Proc] until this evaluates to true, we will poll data
      # @param after_ping [Proc] code that we want to run after each ping (if any)
      #
      # @note Performance of this is not relevant (in regards to blocks) because it is used only
      #   on shutdown and quiet, hence not in the running mode
      def wait_pinging(wait_until:, after_ping: -> {})
        until wait_until.call
          @client.ping
          @scheduler.on_manage

          after_ping.call
          sleep(0.2)
        end
      end

      # We can stop client without a problem, as it will reinitialize itself when running the
      # `#fetch_loop` again. We just need to remember to also reset the runner as it is a long
      # running one, so with a new connection to Kafka, we need to initialize the state of the
      # runner and underlying consumers once again.
      def reset
        # If there was any problem with processing, before we reset things we need to make sure,
        # there are no jobs in the queue. Otherwise it could lead to leakage in between client
        # resetting.
        @jobs_queue.wait(@subscription_group.id)
        @jobs_queue.clear(@subscription_group.id)
        @scheduler.on_clear(@subscription_group.id)
        @events_poller.reset
        @client.reset
        @coordinators.reset
        @interval_runner.reset
        @executors = Processing::ExecutorsBuffer.new(@client, @subscription_group)
      end
    end
  end
end