lib/karafka/processing/worker.rb
# frozen_string_literal: true
module Karafka
module Processing
# Workers are used to run jobs in separate threads.
# Workers are the main processing units of the Karafka framework.
#
# Each job runs in three stages:
# - prepare - here we can run any code that we would need to run blocking before we allow
# the job to run fully async (non blocking). This will always run in a blocking
# way and can be used to make sure all the resources and external dependencies
# are satisfied before going async.
#
# - call - actual processing logic that can run sync or async
#
# - teardown - it should include any code that we want to run after we executed the user
# code. This can be used to unlock certain resources or do other things that are
# not user code but need to run after user code base is executed.
class Worker
include Helpers::Async
# @return [String] id of this worker
attr_reader :id
# @param jobs_queue [JobsQueue]
# @return [Worker]
def initialize(jobs_queue)
@id = SecureRandom.hex(6)
@jobs_queue = jobs_queue
end
private
# Runs processing of jobs in a loop
# Stops when queue is closed.
def call
loop { break unless process }
end
# Fetches a single job, processes it and marks as completed.
#
# @note We do not have error handling here, as no errors should propagate this far. If they
# do, it is a critical error and should bubble up.
#
# @note Upon closing the jobs queue, worker will close it's thread
def process
job = @jobs_queue.pop
instrument_details = { caller: self, job: job, jobs_queue: @jobs_queue }
if job
Karafka.monitor.instrument('worker.process', instrument_details)
Karafka.monitor.instrument('worker.processed', instrument_details) do
job.before_call
# If a job is marked as non blocking, we can run a tick in the job queue and if there
# are no other blocking factors, the job queue will be unlocked.
# If this does not run, all the things will be blocking and job queue won't allow to
# pass it until done.
@jobs_queue.tick(job.group_id) if job.non_blocking?
job.call
job.after_call
true
end
else
false
end
# We signal critical exceptions, notify and do not allow worker to fail
# rubocop:disable Lint/RescueException
rescue Exception => e
# rubocop:enable Lint/RescueException
Karafka.monitor.instrument(
'error.occurred',
caller: self,
job: job,
jobs_queue: @jobs_queue,
error: e,
type: 'worker.process.error'
)
ensure
# job can be nil when the queue is being closed
if job
@jobs_queue.complete(job)
job.finish!
end
# Always publish info, that we completed all the work despite its result
Karafka.monitor.instrument('worker.completed', instrument_details)
end
end
end
end