salsify/delayed_job_heartbeat_plugin

View on GitHub
lib/delayed/heartbeat/worker_heartbeat.rb

Summary

Maintainability
A
0 mins
Test Coverage
# frozen_string_literal: true

module Delayed
  module Heartbeat
    class WorkerHeartbeat

      def initialize(worker_name)
        @worker_model = create_worker_model(worker_name)

        # Use a self-pipe to safely shutdown the heartbeat thread
        @stop_reader, @stop_writer = IO.pipe

        yield(self) if block_given?

        @heartbeat_thread = Thread.new { run_heartbeat_loop }
        # Make this a high priority thread to try to ensure it runs
        @heartbeat_thread.priority = 100
      end

      def alive?
        @heartbeat_thread.alive?
      end

      def stop
        # Use the self-pipe to tell the heartbeat thread to cleanly
        # shutdown
        if @stop_writer
          @stop_writer.close
          @stop_writer = nil
        end
      end

      private

      def create_worker_model(worker_name)
        Delayed::Heartbeat::Worker.transaction do
          Delayed::Heartbeat::Worker.where(name: worker_name).delete_all
          Delayed::Heartbeat::Worker.create!(name: worker_name)
        end
      end

      def run_heartbeat_loop
        loop do
          break if sleep_interruptibly(heartbeat_interval)

          update_heartbeat
          # Return the connection back to the pool since we won't be needing
          # it again for a while.
          Delayed::Backend::ActiveRecord::Job.connection_handler.clear_active_connections!
        end
      rescue StandardError => e
        # We don't want the worker to continue running if the heartbeat can't be written.
        # Don't use Thread.abort_on_exception because that will give Delayed::Job a chance
        # to mark the job as failed which will unlock it even though the clock
        # process has likely already unlocked it and another worker may have picked it up.
        Delayed::Heartbeat.configuration.on_worker_termination.call(@worker_model, e)
        exit(false)
      ensure
        @stop_reader.close
        @worker_model.delete
        # NOTE: The built-in Delayed::Plugins::ClearLocks will unlock the jobs for us
        Delayed::Backend::ActiveRecord::Job.connection_handler.clear_active_connections!
      end

      def update_heartbeat
        now = Time.now.utc
        heartbeat_delta_seconds = now - @worker_model.last_heartbeat_at
        if heartbeat_delta_seconds < heartbeat_timeout_seconds || self_termination_disabled?
          @worker_model.update_column(:last_heartbeat_at, now)
        else
          raise Timeout::Error.new("Worker heartbeat not updated for #{heartbeat_delta_seconds} seconds which " \
              "exceeds timeout\n. Current job: #{@worker_model.job.inspect}")
        end
      end

      def self_termination_disabled?
        !Delayed::Heartbeat.configuration.worker_termination_enabled?
      end

      def heartbeat_timeout_seconds
        Delayed::Heartbeat.configuration.heartbeat_timeout_seconds
      end

      def heartbeat_interval
        Delayed::Heartbeat.configuration.heartbeat_interval_seconds
      end

      # Returns a truthy if the sleep was interrupted
      def sleep_interruptibly(secs)
        IO.select([@stop_reader], nil, nil, secs)
      end
    end
  end
end