lib/delayed/heartbeat/worker.rb
# frozen_string_literal: true
require 'delayed/heartbeat/compatibility'
module Delayed
module Heartbeat
class Worker < ActiveRecord::Base
self.table_name = 'delayed_workers'
if Delayed::Heartbeat::Compatibility.mass_assignment_security_enabled?
attr_accessible :name, :version, :last_heartbeat_at, :host_name, :label
end
before_create do |model|
model.last_heartbeat_at ||= Time.now.utc
model.host_name ||= Socket.gethostname
model.label ||= Delayed::Heartbeat.configuration.worker_label || name
model.version ||= Delayed::Heartbeat.configuration.worker_version
end
def jobs
Delayed::Job.where(locked_by: name, failed_at: nil)
end
def job
jobs.first
end
# Returns the jobs that were unlocked
def unlock_jobs(mark_attempt_failed: true)
orphaned_jobs = jobs.to_a
return orphaned_jobs unless orphaned_jobs.present?
if mark_attempt_failed
mark_job_attempts_failed(orphaned_jobs)
else
Delayed::Job.where(id: orphaned_jobs.map(&:id)).update_all(locked_at: nil, locked_by: nil)
end
orphaned_jobs
end
def self.dead_workers(timeout_seconds)
where('last_heartbeat_at < ?', Time.now.utc - timeout_seconds)
end
def self.workers_with_different_version(current_version)
where('version != ?', current_version)
end
def self.delete_workers(workers)
where(id: workers.map(&:id)).delete_all if workers.present?
end
private
def mark_job_attempts_failed(jobs)
dj_worker = Delayed::Worker.new
jobs.each do |job|
mark_job_attempt_failed(dj_worker, job)
end
end
def mark_job_attempt_failed(dj_worker, job)
# If there are more attempts this reschedules the job otherwise marks it as failed
# and runs appropriate callbacks
dj_worker.reschedule(job)
end
end
end
end