collectiveidea/delayed_job

View on GitHub
lib/delayed/backend/shared_spec.rb

Summary

Maintainability
D
1 day
Test Coverage
require File.expand_path('../../../../spec/sample_jobs', __FILE__)

require 'active_support/core_ext/numeric/time'

shared_examples_for 'a delayed_job backend' do
  let(:worker) { Delayed::Worker.new }

  def create_job(opts = {})
    described_class.create(opts.merge(:payload_object => SimpleJob.new))
  end

  before do
    Delayed::Worker.max_priority = nil
    Delayed::Worker.min_priority = nil
    Delayed::Worker.default_priority = 99
    Delayed::Worker.delay_jobs = true
    Delayed::Worker.default_queue_name = 'default_tracking'
    SimpleJob.runs = 0
    described_class.delete_all
  end

  after do
    Delayed::Worker.reset
  end

  it 'sets run_at automatically if not set' do
    expect(described_class.create(:payload_object => ErrorJob.new).run_at).not_to be_nil
  end

  it 'does not set run_at automatically if already set' do
    later = described_class.db_time_now + 5.minutes
    job = described_class.create(:payload_object => ErrorJob.new, :run_at => later)
    expect(job.run_at).to be_within(1).of(later)
  end

  describe '#reload' do
    it 'reloads the payload' do
      job = described_class.enqueue :payload_object => SimpleJob.new
      expect(job.payload_object.object_id).not_to eq(job.reload.payload_object.object_id)
    end
  end

  describe 'enqueue' do
    context 'with a hash' do
      it "raises ArgumentError when handler doesn't respond_to :perform" do
        expect { described_class.enqueue(:payload_object => Object.new) }.to raise_error(ArgumentError)
      end

      it 'is able to set priority' do
        job = described_class.enqueue :payload_object => SimpleJob.new, :priority => 5
        expect(job.priority).to eq(5)
      end

      it 'uses default priority' do
        job = described_class.enqueue :payload_object => SimpleJob.new
        expect(job.priority).to eq(99)
      end

      it 'is able to set run_at' do
        later = described_class.db_time_now + 5.minutes
        job = described_class.enqueue :payload_object => SimpleJob.new, :run_at => later
        expect(job.run_at).to be_within(1).of(later)
      end

      it 'is able to set queue' do
        job = described_class.enqueue :payload_object => NamedQueueJob.new, :queue => 'tracking'
        expect(job.queue).to eq('tracking')
      end

      it 'uses default queue' do
        job = described_class.enqueue :payload_object => SimpleJob.new
        expect(job.queue).to eq(Delayed::Worker.default_queue_name)
      end

      it "uses the payload object's queue" do
        job = described_class.enqueue :payload_object => NamedQueueJob.new
        expect(job.queue).to eq(NamedQueueJob.new.queue_name)
      end
    end

    context 'with multiple arguments' do
      it "raises ArgumentError when handler doesn't respond_to :perform" do
        expect { described_class.enqueue(Object.new) }.to raise_error(ArgumentError)
      end

      it 'increases count after enqueuing items' do
        described_class.enqueue SimpleJob.new
        expect(described_class.count).to eq(1)
      end

      it 'is able to set priority [DEPRECATED]' do
        silence_warnings do
          job = described_class.enqueue SimpleJob.new, 5
          expect(job.priority).to eq(5)
        end
      end

      it 'uses default priority when it is not set' do
        @job = described_class.enqueue SimpleJob.new
        expect(@job.priority).to eq(99)
      end

      it 'is able to set run_at [DEPRECATED]' do
        silence_warnings do
          later = described_class.db_time_now + 5.minutes
          @job = described_class.enqueue SimpleJob.new, 5, later
          expect(@job.run_at).to be_within(1).of(later)
        end
      end

      it 'works with jobs in modules' do
        M::ModuleJob.runs = 0
        job = described_class.enqueue M::ModuleJob.new
        expect { job.invoke_job }.to change { M::ModuleJob.runs }.from(0).to(1)
      end

      it 'does not mutate the options hash' do
        options = {:priority => 1}
        described_class.enqueue SimpleJob.new, options
        expect(options).to eq(:priority => 1)
      end
    end

    context 'with delay_jobs = false' do
      before(:each) do
        Delayed::Worker.delay_jobs = false
      end

      it 'does not increase count after enqueuing items' do
        described_class.enqueue SimpleJob.new
        expect(described_class.count).to eq(0)
      end

      it 'invokes the enqueued job' do
        job = SimpleJob.new
        expect(job).to receive(:perform)
        described_class.enqueue job
      end

      it 'returns a job, not the result of invocation' do
        expect(described_class.enqueue(SimpleJob.new)).to be_instance_of(described_class)
      end
    end
  end

  describe 'callbacks' do
    before(:each) do
      CallbackJob.messages = []
    end

    %w[before success after].each do |callback|
      it "calls #{callback} with job" do
        job = described_class.enqueue(CallbackJob.new)
        expect(job.payload_object).to receive(callback).with(job)
        job.invoke_job
      end
    end

    it 'calls before and after callbacks' do
      job = described_class.enqueue(CallbackJob.new)
      expect(CallbackJob.messages).to eq(['enqueue'])
      job.invoke_job
      expect(CallbackJob.messages).to eq(%w[enqueue before perform success after])
    end

    it 'calls the after callback with an error' do
      job = described_class.enqueue(CallbackJob.new)
      expect(job.payload_object).to receive(:perform).and_raise(RuntimeError.new('fail'))

      expect { job.invoke_job }.to raise_error(RuntimeError)
      expect(CallbackJob.messages).to eq(['enqueue', 'before', 'error: RuntimeError', 'after'])
    end

    it 'calls error when before raises an error' do
      job = described_class.enqueue(CallbackJob.new)
      expect(job.payload_object).to receive(:before).and_raise(RuntimeError.new('fail'))
      expect { job.invoke_job }.to raise_error(RuntimeError)
      expect(CallbackJob.messages).to eq(['enqueue', 'error: RuntimeError', 'after'])
    end
  end

  describe 'payload_object' do
    it 'raises a DeserializationError when the job class is totally unknown' do
      job = described_class.new :handler => '--- !ruby/object:JobThatDoesNotExist {}'
      expect { job.payload_object }.to raise_error(Delayed::DeserializationError)
    end

    it 'raises a DeserializationError when the job struct is totally unknown' do
      job = described_class.new :handler => '--- !ruby/struct:StructThatDoesNotExist {}'
      expect { job.payload_object }.to raise_error(Delayed::DeserializationError)
    end

    it 'raises a DeserializationError when the YAML.load raises argument error' do
      job = described_class.new :handler => '--- !ruby/struct:GoingToRaiseArgError {}'
      expect(YAML).to receive(:load_dj).and_raise(ArgumentError)
      expect { job.payload_object }.to raise_error(Delayed::DeserializationError)
    end

    it 'raises a DeserializationError when the YAML.load raises syntax error' do
      # only test with Psych since the other YAML parsers don't raise a SyntaxError
      if YAML.parser.class.name !~ /syck|yecht/i
        job = described_class.new :handler => 'message: "no ending quote'
        expect { job.payload_object }.to raise_error(Delayed::DeserializationError)
      end
    end
  end

  describe 'reserve' do
    before do
      Delayed::Worker.max_run_time = 2.minutes
    end

    after do
      Time.zone = nil
    end

    it 'does not reserve failed jobs' do
      create_job :attempts => 50, :failed_at => described_class.db_time_now
      expect(described_class.reserve(worker)).to be_nil
    end

    it 'does not reserve jobs scheduled for the future' do
      create_job :run_at => described_class.db_time_now + 1.minute
      expect(described_class.reserve(worker)).to be_nil
    end

    it 'reserves jobs scheduled for the past' do
      job = create_job :run_at => described_class.db_time_now - 1.minute
      expect(described_class.reserve(worker)).to eq(job)
    end

    it 'reserves jobs scheduled for the past when time zones are involved' do
      Time.zone = 'America/New_York'
      job = create_job :run_at => described_class.db_time_now - 1.minute
      expect(described_class.reserve(worker)).to eq(job)
    end

    it 'does not reserve jobs locked by other workers' do
      job = create_job
      other_worker = Delayed::Worker.new
      other_worker.name = 'other_worker'
      expect(described_class.reserve(other_worker)).to eq(job)
      expect(described_class.reserve(worker)).to be_nil
    end

    it 'reserves open jobs' do
      job = create_job
      expect(described_class.reserve(worker)).to eq(job)
    end

    it 'reserves expired jobs' do
      job = create_job(:locked_by => 'some other worker', :locked_at => described_class.db_time_now - Delayed::Worker.max_run_time - 1.minute)
      expect(described_class.reserve(worker)).to eq(job)
    end

    it 'reserves own jobs' do
      job = create_job(:locked_by => worker.name, :locked_at => (described_class.db_time_now - 1.minutes))
      expect(described_class.reserve(worker)).to eq(job)
    end
  end

  context '#name' do
    it 'is the class name of the job that was enqueued' do
      expect(described_class.create(:payload_object => ErrorJob.new).name).to eq('ErrorJob')
    end

    it 'is the method that will be called if its a performable method object' do
      job = described_class.new(:payload_object => NamedJob.new)
      expect(job.name).to eq('named_job')
    end

    it 'is the instance method that will be called if its a performable method object' do
      job = Story.create(:text => '...').delay.save
      expect(job.name).to eq('Story#save')
    end

    it 'parses from handler on deserialization error' do
      job = Story.create(:text => '...').delay.text
      job.payload_object.object.destroy
      expect(job.reload.name).to eq('Delayed::PerformableMethod')
    end
  end

  context 'worker prioritization' do
    after do
      Delayed::Worker.max_priority = nil
      Delayed::Worker.min_priority = nil
      Delayed::Worker.queue_attributes = {}
    end

    it 'fetches jobs ordered by priority' do
      10.times { described_class.enqueue SimpleJob.new, :priority => rand(10) }
      jobs = []
      10.times { jobs << described_class.reserve(worker) }
      expect(jobs.size).to eq(10)
      jobs.each_cons(2) do |a, b|
        expect(a.priority).to be <= b.priority
      end
    end

    it 'only finds jobs greater than or equal to min priority' do
      min = 5
      Delayed::Worker.min_priority = min
      [4, 5, 6].sort_by { |_i| rand }.each { |i| create_job :priority => i }
      2.times do
        job = described_class.reserve(worker)
        expect(job.priority).to be >= min
        job.destroy
      end
      expect(described_class.reserve(worker)).to be_nil
    end

    it 'only finds jobs less than or equal to max priority' do
      max = 5
      Delayed::Worker.max_priority = max
      [4, 5, 6].sort_by { |_i| rand }.each { |i| create_job :priority => i }
      2.times do
        job = described_class.reserve(worker)
        expect(job.priority).to be <= max
        job.destroy
      end
      expect(described_class.reserve(worker)).to be_nil
    end

    it 'sets job priority based on queue_attributes configuration' do
      Delayed::Worker.queue_attributes = {'job_tracking' => {:priority => 4}}
      job = described_class.enqueue :payload_object => NamedQueueJob.new
      expect(job.priority).to eq(4)
    end

    it 'sets job priority based on the passed in priority overrideing queue_attributes configuration' do
      Delayed::Worker.queue_attributes = {'job_tracking' => {:priority => 4}}
      job = described_class.enqueue :payload_object => NamedQueueJob.new, :priority => 10
      expect(job.priority).to eq(10)
    end
  end

  context 'clear_locks!' do
    before do
      @job = create_job(:locked_by => 'worker1', :locked_at => described_class.db_time_now)
    end

    it 'clears locks for the given worker' do
      described_class.clear_locks!('worker1')
      expect(described_class.reserve(worker)).to eq(@job)
    end

    it 'does not clear locks for other workers' do
      described_class.clear_locks!('different_worker')
      expect(described_class.reserve(worker)).not_to eq(@job)
    end
  end

  context 'unlock' do
    before do
      @job = create_job(:locked_by => 'worker', :locked_at => described_class.db_time_now)
    end

    it 'clears locks' do
      @job.unlock
      expect(@job.locked_by).to be_nil
      expect(@job.locked_at).to be_nil
    end
  end

  context 'large handler' do
    before do
      text = 'Lorem ipsum dolor sit amet. ' * 1000
      @job = described_class.enqueue Delayed::PerformableMethod.new(text, :length, {})
    end

    it 'has an id' do
      expect(@job.id).not_to be_nil
    end
  end

  context 'named queues' do
    context 'when worker has one queue set' do
      before(:each) do
        worker.queues = ['large']
      end

      it 'only works off jobs which are from its queue' do
        expect(SimpleJob.runs).to eq(0)

        create_job(:queue => 'large')
        create_job(:queue => 'small')
        worker.work_off

        expect(SimpleJob.runs).to eq(1)
      end
    end

    context 'when worker has two queue set' do
      before(:each) do
        worker.queues = %w[large small]
      end

      it 'only works off jobs which are from its queue' do
        expect(SimpleJob.runs).to eq(0)

        create_job(:queue => 'large')
        create_job(:queue => 'small')
        create_job(:queue => 'medium')
        create_job
        worker.work_off

        expect(SimpleJob.runs).to eq(2)
      end
    end

    context 'when worker does not have queue set' do
      before(:each) do
        worker.queues = []
      end

      it 'works off all jobs' do
        expect(SimpleJob.runs).to eq(0)

        create_job(:queue => 'one')
        create_job(:queue => 'two')
        create_job
        worker.work_off

        expect(SimpleJob.runs).to eq(3)
      end
    end
  end

  context 'max_attempts' do
    before(:each) do
      @job = described_class.enqueue SimpleJob.new
    end

    it 'is not defined' do
      expect(@job.max_attempts).to be_nil
    end

    it 'uses the max_attempts value on the payload when defined' do
      expect(@job.payload_object).to receive(:max_attempts).and_return(99)
      expect(@job.max_attempts).to eq(99)
    end
  end

  describe '#max_run_time' do
    before(:each) { @job = described_class.enqueue SimpleJob.new }

    it 'is not defined' do
      expect(@job.max_run_time).to be_nil
    end

    it 'results in a default run time when not defined' do
      expect(worker.max_run_time(@job)).to eq(Delayed::Worker::DEFAULT_MAX_RUN_TIME)
    end

    it 'uses the max_run_time value on the payload when defined' do
      expect(@job.payload_object).to receive(:max_run_time).and_return(30.minutes)
      expect(@job.max_run_time).to eq(30.minutes)
    end

    it 'results in an overridden run time when defined' do
      expect(@job.payload_object).to receive(:max_run_time).and_return(45.minutes)
      expect(worker.max_run_time(@job)).to eq(45.minutes)
    end

    it 'job set max_run_time can not exceed default max run time' do
      expect(@job.payload_object).to receive(:max_run_time).and_return(Delayed::Worker::DEFAULT_MAX_RUN_TIME + 60)
      expect(worker.max_run_time(@job)).to eq(Delayed::Worker::DEFAULT_MAX_RUN_TIME)
    end
  end

  describe 'destroy_failed_jobs' do
    context 'with a SimpleJob' do
      before(:each) do
        @job = described_class.enqueue SimpleJob.new
      end

      it 'is not defined' do
        expect(@job.destroy_failed_jobs?).to be true
      end

      it 'uses the destroy failed jobs value on the payload when defined' do
        expect(@job.payload_object).to receive(:destroy_failed_jobs?).and_return(false)
        expect(@job.destroy_failed_jobs?).to be false
      end
    end

    context 'with a job that raises DserializationError' do
      before(:each) do
        @job = described_class.new :handler => '--- !ruby/struct:GoingToRaiseArgError {}'
      end

      it 'falls back reasonably' do
        expect(YAML).to receive(:load_dj).and_raise(ArgumentError)
        expect(@job.destroy_failed_jobs?).to be true
      end
    end
  end

  describe 'yaml serialization' do
    context 'when serializing jobs' do
      it 'raises error ArgumentError for new records' do
        story = Story.new(:text => 'hello')
        if story.respond_to?(:new_record?)
          expect { story.delay.tell }.to raise_error(
            ArgumentError,
            "job cannot be created for non-persisted record: #{story.inspect}"
          )
        end
      end

      it 'raises error ArgumentError for destroyed records' do
        story = Story.create(:text => 'hello')
        story.destroy
        expect { story.delay.tell }.to raise_error(
          ArgumentError,
          "job cannot be created for non-persisted record: #{story.inspect}"
        )
      end
    end

    context 'when reload jobs back' do
      it 'reloads changed attributes' do
        story = Story.create(:text => 'hello')
        job = story.delay.tell
        story.text = 'goodbye'
        story.save!
        expect(job.reload.payload_object.object.text).to eq('goodbye')
      end

      it 'raises deserialization error for destroyed records' do
        story = Story.create(:text => 'hello')
        job = story.delay.tell
        story.destroy
        expect { job.reload.payload_object }.to raise_error(Delayed::DeserializationError)
      end
    end
  end

  describe 'worker integration' do
    before do
      Delayed::Job.delete_all
      SimpleJob.runs = 0
    end

    describe 'running a job' do
      it 'fails after Worker.max_run_time' do
        Delayed::Worker.max_run_time = 1.second
        job = Delayed::Job.create :payload_object => LongRunningJob.new
        worker.run(job)
        expect(job.error).to_not be_nil
        expect(job.reload.last_error).to match(/expired/)
        expect(job.reload.last_error).to match(/Delayed::Worker\.max_run_time is only 1 second/)
        expect(job.attempts).to eq(1)
      end

      context 'when the job raises a deserialization error' do
        after do
          Delayed::Worker.destroy_failed_jobs = true
        end

        it 'marks the job as failed' do
          Delayed::Worker.destroy_failed_jobs = false
          job = described_class.create! :handler => '--- !ruby/object:JobThatDoesNotExist {}'
          expect_any_instance_of(described_class).to receive(:destroy_failed_jobs?).and_return(false)
          worker.work_off
          job.reload
          expect(job).to be_failed
        end
      end
    end

    describe 'failed jobs' do
      before do
        @job = Delayed::Job.enqueue(ErrorJob.new, :run_at => described_class.db_time_now - 1)
      end

      after do
        # reset default
        Delayed::Worker.destroy_failed_jobs = true
      end

      it 'records last_error when destroy_failed_jobs = false, max_attempts = 1' do
        Delayed::Worker.destroy_failed_jobs = false
        Delayed::Worker.max_attempts = 1
        worker.run(@job)
        @job.reload
        expect(@job.error).to_not be_nil
        expect(@job.last_error).to match(/did not work/)
        expect(@job.attempts).to eq(1)
        expect(@job).to be_failed
      end

      it 're-schedules jobs after failing' do
        worker.work_off
        @job.reload
        expect(@job.last_error).to match(/did not work/)
        if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.4.0')
          # Ruby 3.4 produces a more verbose message
          expect(@job.last_error).to match(/sample_jobs.rb:\d+:in 'ErrorJob#perform'/)
        else
          expect(@job.last_error).to match(/sample_jobs.rb:\d+:in `perform'/)
        end
        expect(@job.attempts).to eq(1)
        expect(@job.run_at).to be > Delayed::Job.db_time_now - 10.minutes
        expect(@job.run_at).to be < Delayed::Job.db_time_now + 10.minutes
        expect(@job.locked_by).to be_nil
        expect(@job.locked_at).to be_nil
      end

      it 're-schedules jobs with handler provided time if present' do
        job = Delayed::Job.enqueue(CustomRescheduleJob.new(99.minutes))
        worker.run(job)
        job.reload

        expect((Delayed::Job.db_time_now + 99.minutes - job.run_at).abs).to be < 1
      end

      it "does not fail when the triggered error doesn't have a message" do
        error_with_nil_message = StandardError.new
        expect(error_with_nil_message).to receive(:message).twice.and_return(nil)
        expect(@job).to receive(:invoke_job).and_raise error_with_nil_message
        expect { worker.run(@job) }.not_to raise_error
      end
    end

    context 'reschedule' do
      before do
        @job = Delayed::Job.create :payload_object => SimpleJob.new
      end

      shared_examples_for 'any failure more than Worker.max_attempts times' do
        context "when the job's payload has a #failure hook" do
          before do
            @job = Delayed::Job.create :payload_object => OnPermanentFailureJob.new
            expect(@job.payload_object).to respond_to(:failure)
          end

          it 'runs that hook' do
            expect(@job.payload_object).to receive(:failure)
            worker.reschedule(@job)
          end

          it 'handles error in hook' do
            Delayed::Worker.destroy_failed_jobs = false
            @job.payload_object.raise_error = true
            expect { worker.reschedule(@job) }.not_to raise_error
            expect(@job.failed_at).to_not be_nil
          end
        end

        context "when the job's payload has no #failure hook" do
          # It's a little tricky to test this in a straightforward way,
          # because putting a not_to receive expectation on
          # @job.payload_object.failure makes that object incorrectly return
          # true to payload_object.respond_to? :failure, which is what
          # reschedule uses to decide whether to call failure. So instead, we
          # just make sure that the payload_object as it already stands doesn't
          # respond_to? failure, then shove it through the iterated reschedule
          # loop and make sure we don't get a NoMethodError (caused by calling
          # that nonexistent failure method).

          before do
            expect(@job.payload_object).not_to respond_to(:failure)
          end

          it 'does not try to run that hook' do
            expect do
              Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
            end.not_to raise_exception
          end
        end
      end

      context 'and we want to destroy jobs' do
        after do
          Delayed::Worker.destroy_failed_jobs = true
        end

        it_behaves_like 'any failure more than Worker.max_attempts times'

        it 'is destroyed if it failed more than Worker.max_attempts times' do
          expect(@job).to receive(:destroy)
          Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
        end

        it 'is destroyed if the job has destroy failed jobs set' do
          Delayed::Worker.destroy_failed_jobs = false
          expect(@job).to receive(:destroy_failed_jobs?).and_return(true)
          expect(@job).to receive(:destroy)
          Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
        end

        it 'is not destroyed if failed fewer than Worker.max_attempts times' do
          expect(@job).not_to receive(:destroy)
          (Delayed::Worker.max_attempts - 1).times { worker.reschedule(@job) }
        end
      end

      context "and we don't want to destroy jobs" do
        before do
          Delayed::Worker.destroy_failed_jobs = false
        end

        after do
          Delayed::Worker.destroy_failed_jobs = true
        end

        it_behaves_like 'any failure more than Worker.max_attempts times'

        context 'and destroy failed jobs is false' do
          it 'is failed if it failed more than Worker.max_attempts times' do
            expect(@job.reload).not_to be_failed
            Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
            expect(@job.reload).to be_failed
          end

          it 'is not failed if it failed fewer than Worker.max_attempts times' do
            (Delayed::Worker.max_attempts - 1).times { worker.reschedule(@job) }
            expect(@job.reload).not_to be_failed
          end
        end

        context 'and destroy failed jobs for job is false' do
          before do
            Delayed::Worker.destroy_failed_jobs = true
          end

          it 'is failed if it failed more than Worker.max_attempts times' do
            expect(@job).to receive(:destroy_failed_jobs?).and_return(false)
            expect(@job.reload).not_to be_failed
            Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
            expect(@job.reload).to be_failed
          end

          it 'is not failed if it failed fewer than Worker.max_attempts times' do
            (Delayed::Worker.max_attempts - 1).times { worker.reschedule(@job) }
            expect(@job.reload).not_to be_failed
          end
        end
      end
    end
  end
end