salsify/delayed_job_heartbeat_plugin

View on GitHub
lib/delayed/heartbeat/worker.rb

Summary

Maintainability
A
0 mins
Test Coverage
# frozen_string_literal: true

require 'delayed/heartbeat/compatibility'

module Delayed
  module Heartbeat
    class Worker < ActiveRecord::Base
      self.table_name = 'delayed_workers'

      if Delayed::Heartbeat::Compatibility.mass_assignment_security_enabled?
        attr_accessible :name, :version, :last_heartbeat_at, :host_name, :label
      end

      before_create do |model|
        model.last_heartbeat_at ||= Time.now.utc
        model.host_name ||= Socket.gethostname
        model.label ||= Delayed::Heartbeat.configuration.worker_label || name
        model.version ||= Delayed::Heartbeat.configuration.worker_version
      end

      def jobs
        Delayed::Job.where(locked_by: name, failed_at: nil)
      end

      def job
        jobs.first
      end

      # Returns the jobs that were unlocked
      def unlock_jobs(mark_attempt_failed: true)
        orphaned_jobs = jobs.to_a
        return orphaned_jobs unless orphaned_jobs.present?

        if mark_attempt_failed
          mark_job_attempts_failed(orphaned_jobs)
        else
          Delayed::Job.where(id: orphaned_jobs.map(&:id)).update_all(locked_at: nil, locked_by: nil)
        end

        orphaned_jobs
      end

      def self.dead_workers(timeout_seconds)
        where('last_heartbeat_at < ?', Time.now.utc - timeout_seconds)
      end

      def self.workers_with_different_version(current_version)
        where('version != ?', current_version)
      end

      def self.delete_workers(workers)
        where(id: workers.map(&:id)).delete_all if workers.present?
      end

      private

      def mark_job_attempts_failed(jobs)
        dj_worker = Delayed::Worker.new
        jobs.each do |job|
          mark_job_attempt_failed(dj_worker, job)
        end
      end

      def mark_job_attempt_failed(dj_worker, job)
        # If there are more attempts this reschedules the job otherwise marks it as failed
        # and runs appropriate callbacks
        dj_worker.reschedule(job)
      end
    end
  end
end