lib/karafka/connection/listener.rb
# frozen_string_literal: true
module Karafka
module Connection
# A single listener that listens to incoming messages from a single subscription group.
# It polls the messages and then enqueues jobs. It also takes care of potential recovery from
# critical errors by restarting everything in a safe manner.
#
# This is the heart of the consumption process.
#
# It provides async API for managing, so all status changes are expected to be async.
class Listener
include Helpers::Async
# Can be useful for logging
# @return [String] id of this listener
attr_reader :id
# @return [Karafka::Routing::SubscriptionGroup] subscription group that this listener handles
attr_reader :subscription_group
# How long to wait in the initial events poll. Increases chances of having the initial events
# immediately available
INITIAL_EVENTS_POLL_TIMEOUT = 100
private_constant :INITIAL_EVENTS_POLL_TIMEOUT
# @param subscription_group [Karafka::Routing::SubscriptionGroup]
# @param jobs_queue [Karafka::Processing::JobsQueue] queue where we should push work
# @param scheduler [Karafka::Processing::Scheduler] scheduler we want to use
# @return [Karafka::Connection::Listener] listener instance
def initialize(subscription_group, jobs_queue, scheduler)
proc_config = ::Karafka::App.config.internal.processing
@id = SecureRandom.hex(6)
@subscription_group = subscription_group
@jobs_queue = jobs_queue
@coordinators = Processing::CoordinatorsBuffer.new(subscription_group.topics)
@client = Client.new(@subscription_group, -> { running? })
@executors = Processing::ExecutorsBuffer.new(@client, subscription_group)
@jobs_builder = proc_config.jobs_builder
@partitioner = proc_config.partitioner_class.new(subscription_group)
@scheduler = scheduler
@events_poller = Helpers::IntervalRunner.new { @client.events_poll }
# We keep one buffer for messages to preserve memory and not allocate extra objects
# We can do this that way because we always first schedule jobs using messages before we
# fetch another batch.
@messages_buffer = MessagesBuffer.new(subscription_group)
@mutex = Mutex.new
@status = Status.new
@jobs_queue.register(@subscription_group.id)
# This makes sure that even if we tick more often than the interval time due to frequent
# unlocks from short-lived jobs or async queues synchronization, events handling and jobs
# scheduling still happens with the expected frequency
@interval_runner = Helpers::IntervalRunner.new do
@events_poller.call
@scheduler.on_manage
end
end
# Runs the main listener fetch loop.
#
# @note Prefetch callbacks can be used to seek offset or do other things before we actually
# start consuming data
def call
Karafka.monitor.instrument(
'connection.listener.before_fetch_loop',
caller: self,
client: @client,
subscription_group: @subscription_group
)
fetch_loop
Karafka.monitor.instrument(
'connection.listener.after_fetch_loop',
caller: self,
client: @client,
subscription_group: @subscription_group
)
end
# Aliases all statuses operations directly on the listener so we have a listener-facing API
Status::STATES.each do |state, transition|
# @return [Boolean] is the listener in a given state
define_method "#{state}?" do
@status.public_send("#{state}?")
end
# Moves listener to a given state
define_method transition do
@status.public_send(transition)
end
end
# @return [Boolean] is this listener active (not stopped and not pending)
def active?
@status.active?
end
# We overwrite the state `#start` because on start we need to also start running listener in
# the async thread. While other state transitions happen automatically and status state
# change is enough, here we need to run the background threads
def start!
if stopped?
@client.reset
@status.reset!
end
@status.start!
async_call("karafka.listener##{@subscription_group.id}")
end
# Stops the jobs queue, triggers shutdown on all the executors (sync), commits offsets and
# stops kafka client.
#
# @note This method is not private despite being part of the fetch loop because in case of
# a forceful shutdown, it may be invoked from a separate thread
#
# @note We wrap it with a mutex exactly because of the above case of forceful shutdown
def shutdown
@mutex.synchronize do
return if stopped?
# Nothing to clear if it was not even running
return stopped! if pending?
@executors.clear
@coordinators.reset
@client.stop
stopped!
end
end
private
# Fetches the data and adds it to the jobs queue.
#
# @note We catch all the errors here, so they don't affect other listeners (or this one)
# so we will be able to listen and consume other incoming messages.
# Since it is run inside Karafka::Connection::Runner thread - catching all the exceptions
# won't crash the whole process. Here we mostly focus on catching the exceptions related to
# Kafka connections / Internet connection issues / Etc. Business logic problems should not
# propagate this far.
def fetch_loop
running!
# Run the initial events fetch to improve chances of having metrics and initial callbacks
# triggers on start.
#
# In theory this may slow down the initial boot but we limit it up to 100ms, so it should
# not have a big initial impact. It may not be enough but Karafka does not give the boot
# warranties of statistics or other callbacks being immediately available, hence this is
# a fair trade-off
@client.events_poll(INITIAL_EVENTS_POLL_TIMEOUT)
# Run the main loop as long as we are not stopping or moving into quiet mode
while running?
Karafka.monitor.instrument(
'connection.listener.fetch_loop',
caller: self,
client: @client,
subscription_group: @subscription_group
)
resume_paused_partitions
Karafka.monitor.instrument(
'connection.listener.fetch_loop.received',
caller: self,
client: @client,
subscription_group: @subscription_group,
messages_buffer: @messages_buffer
) do
# We need to fetch data before we revoke lost partitions details as during the polling
# the callbacks for tracking lost partitions are triggered. Otherwise we would be
# always one batch behind.
poll_and_remap_messages
end
# If there were revoked partitions, we need to wait on their jobs to finish before
# distributing consuming jobs as upon revoking, we might get assigned to the same
# partitions, thus getting their jobs. The revoking jobs need to finish before
# appropriate consumers are taken down and re-created
build_and_schedule_revoked_jobs_for_revoked_partitions
# We wait only on jobs from our subscription group. Other groups are independent.
# This will block on revoked jobs until they are finished. Those are not meant to last
# long and should not have any bigger impact on the system. Doing this in a blocking way
# simplifies the overall design and prevents from race conditions
wait
build_and_schedule_flow_jobs
# periodic jobs never run on topics and partitions that were scheduled, so no risk in
# having collective wait after both
build_and_schedule_periodic_jobs if Karafka.pro?
wait
end
# If we are stopping we will no longer schedule any regular jobs despite polling.
# We need to keep polling not to exceed the `max.poll.interval` for long-running
# non-blocking jobs and we need to allow them to finish. We however do not want to
# enqueue any new jobs. It's worth keeping in mind that it is the end user responsibility
# to detect shutdown in their long-running logic or else Karafka will force shutdown
# after a while.
#
# We do not care about resuming any partitions or lost jobs as we do not plan to do
# anything with them as we're in the shutdown phase.
#
# What we do care however is the ability to still run revocation jobs in case anything
# would change in the cluster. We still want to notify the long-running jobs about changes
# that occurred in the cluster.
wait_pinging(
wait_until: -> { @jobs_queue.empty?(@subscription_group.id) },
after_ping: -> { build_and_schedule_revoked_jobs_for_revoked_partitions }
)
# We do not want to schedule the shutdown jobs prior to finishing all the jobs
# (including non-blocking) as there might be a long-running job with a shutdown and then
# we would run two jobs in parallel for the same executor and consumer. We do not want that
# as it could create a race-condition.
build_and_schedule_shutdown_jobs
# Wait until all the shutdown jobs are done
wait_pinging(wait_until: -> { @jobs_queue.empty?(@subscription_group.id) })
quieted!
# Wait if we're in the process of finishing started work or finished all the work and
# just sitting and being quiet
wait_pinging(wait_until: -> { !quiet? })
# This extra ping will make sure we've refreshed the rebalance state after other instances
# potentially shutdown. This will prevent us from closing with a dangling callback
@client.ping
shutdown
# This is on purpose - see the notes for this method
# rubocop:disable Lint/RescueException
rescue Exception => e
# rubocop:enable Lint/RescueException
Karafka.monitor.instrument(
'error.occurred',
caller: self,
error: e,
type: 'connection.listener.fetch_loop.error'
)
reset
sleep(1) && retry
end
# Resumes processing of partitions that were paused due to an error.
def resume_paused_partitions
@coordinators.resume do |topic, partition|
@client.resume(topic.name, partition)
end
end
# Polls messages within the time and amount boundaries defined in the settings and then
# builds karafka messages based on the raw rdkafka messages buffer returned by the
# `#batch_poll` method.
#
# @note There are two buffers, one for raw messages and one for "built" karafka messages
def poll_and_remap_messages
@messages_buffer.remap(
@client.batch_poll
)
end
# Enqueues revoking jobs for partitions that were taken away from the running process.
def build_and_schedule_revoked_jobs_for_revoked_partitions
revoked_partitions = @client.rebalance_manager.revoked_partitions
# Stop early to save on some execution and array allocation
return if revoked_partitions.empty?
jobs = []
revoked_partitions.each do |topic, partitions|
partitions.each do |partition|
@coordinators.revoke(topic, partition)
# There may be a case where we have lost partition of which data we have never
# processed (if it was assigned and revoked really fast), thus we may not have it
# here. In cases like this, we do not run a revocation job
@executors.find_all(topic, partition).each do |executor|
executor.coordinator.increment(:revoked)
jobs << @jobs_builder.revoked(executor)
end
# We need to remove all the executors of a given topic partition that we have lost, so
# next time we pick up it's work, new executors kick in. This may be needed especially
# for LRJ where we could end up with a race condition
# This revocation needs to happen after the jobs are scheduled, otherwise they would
# be scheduled with new executors instead of old
@executors.revoke(topic, partition)
end
end
return if jobs.empty?
jobs.each(&:before_schedule)
@scheduler.on_schedule_revocation(jobs)
end
# Enqueues the shutdown jobs for all the executors that exist in our subscription group
def build_and_schedule_shutdown_jobs
jobs = []
@executors.each do |executor|
executor.coordinator.increment(:shutdown)
job = @jobs_builder.shutdown(executor)
jobs << job
end
return if jobs.empty?
jobs.each(&:before_schedule)
@scheduler.on_schedule_shutdown(jobs)
end
# Takes the messages per topic partition and enqueues processing jobs in threads using
# given scheduler. It also handles the idle jobs when filtering API removed all messages
# and we need to run house-keeping
def build_and_schedule_flow_jobs
return if @messages_buffer.empty?
consume_jobs = []
idle_jobs = []
@messages_buffer.each do |topic, partition, messages|
coordinator = @coordinators.find_or_create(topic, partition)
# Start work coordination for this topic partition
coordinator.start(messages)
# We do not increment coordinator for idle job because it's not a user related one
# and it will not go through a standard lifecycle. Same applies to revoked and shutdown
if messages.empty?
coordinator.increment(:idle)
executor = @executors.find_or_create(topic, partition, 0, coordinator)
idle_jobs << @jobs_builder.idle(executor)
else
@partitioner.call(topic, messages, coordinator) do |group_id, partition_messages|
coordinator.increment(:consume)
executor = @executors.find_or_create(topic, partition, group_id, coordinator)
consume_jobs << @jobs_builder.consume(executor, partition_messages)
end
end
end
# We schedule the idle jobs before running the `#before_schedule` on the consume jobs so
# workers can already pick up the idle jobs while the `#before_schedule` on consumption
# jobs runs
unless idle_jobs.empty?
idle_jobs.each(&:before_schedule)
@scheduler.on_schedule_idle(idle_jobs)
end
unless consume_jobs.empty?
consume_jobs.each(&:before_schedule)
@scheduler.on_schedule_consumption(consume_jobs)
end
end
# Builds and schedules periodic jobs for topics partitions for which no messages were
# received recently. In case `Idle` job is invoked, we do not run periodic. Idle means that
# a complex flow kicked in and it was a user choice not to run consumption but messages were
# shipped.
def build_and_schedule_periodic_jobs
# Shortcut if periodic jobs are not used at all. No need to run the complex flow when it
# will never end up with anything. If periodics on any of the topics are not even defined,
# we can finish fast
@periodic_jobs ||= @subscription_group.topics.count(&:periodic_job?)
return if @periodic_jobs.zero?
jobs = []
# We select only currently assigned topics and partitions from the current subscription
# group as only those are of our interest. We then filter that to only pick those for whom
# we want to run periodic jobs and then we select only those that did not receive any
# messages recently. This ensures, that we do not tick close to recent arrival of messages
# but rather after certain period of inactivity
Karafka::App.assignments.each do |topic, partitions|
# Skip for assignments not from our subscription group
next unless topic.subscription_group == @subscription_group
# Skip if this topic does not have periodic jobs enabled
next unless topic.periodic_job?
topic_name = topic.name
interval = topic.periodic_job.interval
partitions.each do |partition|
coordinator = @coordinators.find_or_create(topic_name, partition)
# Skip if we were operating on a given topic partition recently
next if coordinator.active_within?(interval)
# Do not tick if we do not want to tick during pauses
next if coordinator.paused? && !topic.periodic_job.during_pause?
# If we do not want to run periodics during retry flows, we should not
# Since this counter is incremented before processing, here it is always -1 from what
# we see in the consumer flow. This is why attempt 0 means that we will have first
# run (ok) but attempt 1 means, there was an error and we will retry
next if coordinator.attempt.positive? && !topic.periodic_job.during_retry?
@executors.find_all_or_create(topic_name, partition, coordinator).each do |executor|
coordinator.increment(:periodic)
jobs << @jobs_builder.periodic(executor)
end
end
end
return if jobs.empty?
jobs.each(&:before_schedule)
@scheduler.on_schedule_periodic(jobs)
end
# Waits for all the jobs from a given subscription group to finish before moving forward
def wait
@jobs_queue.wait(@subscription_group.id) do
@interval_runner.call
end
end
# Waits without blocking the polling
#
# This should be used only when we no longer plan to use any incoming messages data and we
# can safely discard it. We can however use the rebalance information if needed.
#
# @param wait_until [Proc] until this evaluates to true, we will poll data
# @param after_ping [Proc] code that we want to run after each ping (if any)
#
# @note Performance of this is not relevant (in regards to blocks) because it is used only
# on shutdown and quiet, hence not in the running mode
def wait_pinging(wait_until:, after_ping: -> {})
until wait_until.call
@client.ping
@scheduler.on_manage
after_ping.call
sleep(0.2)
end
end
# We can stop client without a problem, as it will reinitialize itself when running the
# `#fetch_loop` again. We just need to remember to also reset the runner as it is a long
# running one, so with a new connection to Kafka, we need to initialize the state of the
# runner and underlying consumers once again.
def reset
# If there was any problem with processing, before we reset things we need to make sure,
# there are no jobs in the queue. Otherwise it could lead to leakage in between client
# resetting.
@jobs_queue.wait(@subscription_group.id)
@jobs_queue.clear(@subscription_group.id)
@scheduler.on_clear(@subscription_group.id)
@events_poller.reset
@client.reset
@coordinators.reset
@interval_runner.reset
@executors = Processing::ExecutorsBuffer.new(@client, @subscription_group)
end
end
end
end