cblavier/jobbr

View on GitHub
app/models/jobbr/job.rb

Summary

Maintainability
A
3 hrs
Test Coverage
module Jobbr

  class Job < ::Ohm::Model

    MAX_RUN_PER_JOB = 500

    include ::Ohm::Timestamps
    include ::Ohm::DataTypes
    include ::Ohm::Callbacks

    attribute :type
    attribute :delayed, Type::Boolean
    attribute :scheduled, Type::Boolean

    collection :runs, 'Jobbr::Run'

    index :type
    index :delayed
    index :scheduled

    def self.instance(instance_class_name = nil)
      if instance_class_name
        job_class = instance_class_name.camelize.constantize
      else
        job_class = self
      end

      job = Job.find(type: job_class.to_s).first
      if job.nil?
        delayed = job_class.included_modules.include?(Jobbr::Delayed)
        scheduled = job_class.included_modules.include?(Jobbr::Scheduled)
        job = Job.create(type: job_class.to_s, delayed: delayed, scheduled: scheduled)
      end
      job
    end

    def self.run_by_name(name, *args)
      instance(name).run(*args)
    end

    def self.run(*args)
      instance.run(*args)
    end

    def self.description(desc = nil)
      @description = desc if desc
      @description
    end

    def self.delayed
      find(delayed: true)
    end

    def self.scheduled
      find(scheduled: true)
    end

    def self.count
      all.count
    end

    def self.by_name(name)
      class_name = name.underscore.camelize
      Job.find(type: class_name).first
    end

    # overriding Ohm find to get Sidekiq to find job instances
    def self.find(id)
      if id.instance_of?(Hash)
        super
      elsif job = Jobbr::Job[id]
        job.send(:typed_self)
      end
    end

    def run(params = {}, options = {})
      options = { delayed: true }.merge(options)
      job_run = Run.create(status: :waiting, started_at: Time.now, job: self)
      if options[:delayed] && self.delayed && !Rails.env.test? && !Rails.env.ci?
        delayed_options = { retry: 0, backtrace: true }
        delayed_options[:queue] = typed_self.class.queue if typed_self.class.queue
        delayed_options['throttle'] = typed_self.class.throttle if typed_self.class.throttle
        typed_self.delay(delayed_options.merge(options)).inner_run(job_run.id, params)
        Rails.logger.debug("sidekiq options: #{delayed_options.merge(options).inspect}")
      else
        self.inner_run(job_run.id, params)
      end
      job_run
    end

    def handle_process_interruption(job_run, signals)
      signals.each do |signal|
        Signal.trap(signal) do
          job_run.status = :failed
          job_run.logger.error("Job interrupted by a #{signal} signal")
          job_run.finished_at = Time.now
          job_run.save
        end
      end
    end

    def every
      if scheduled?
        require self.type.underscore
        Object::const_get(self.type).every
      else
        nil
      end
    end

    def last_run
      @last_run ||= self.ordered_runs.first
    end

    def average_run_time
      return 0 if runs.empty?
      (runs.map { |run| run.run_time }.compact.inject { |sum, el| sum + el }.to_f / runs.count).round(2)
    end

    def to_param
      self.type.underscore.dasherize.gsub('/', '::')
    end

    def name
      self.type.demodulize.underscore.humanize
    end

    def scheduled?
      !self.delayed
    end

    def delayed?
      self.delayed
    end

    def ordered_runs
      self.runs.sort_by(:started_at, order: 'ALPHA DESC')
    end

    def after_delete
      self.runs.each(&:delete)
    end

    def perform
      raise NotImplementedError.new :message => 'Must be implemented'
    end

    protected

    # mocking purpose
    def max_run_per_job
      MAX_RUN_PER_JOB
    end

    # prevents Run collection to grow beyond max_run_per_job
    def cap_runs!
      runs_count = self.runs.count
      if runs_count > max_run_per_job
        runs.sort_by(:started_at, order: 'ALPHA ASC', limit: [0, runs_count - max_run_per_job]).each do |run|
          if run.status == :failed || run.status == :success
            run.delete
          end
        end
      end
    end

    def inner_run(job_run_id, params = {})
      job_run = Run[job_run_id]
      job_run.status = :running
      job_run.started_at = Time.now
      job_run.save

      cap_runs!

      handle_process_interruption(job_run, ['TERM', 'INT'])

      begin
        job_run.logger.debug("Starting with params #{params.inspect}")
        perform(job_run, params)
        job_run.status = :success
        job_run.progress = 100
      rescue Exception => e
        job_run.status = :failed
        job_run.logger.error(e.message)
        job_run.logger.error(e.backtrace)
        raise e
      ensure
        job_run.finished_at = Time.now
        job_run.save
      end
    end

    def perform(job_run, params)
      case typed_self.method(:perform).parameters.length
      when 0 then typed_self.perform
      when 1 then typed_self.perform(job_run)
      when 2 then typed_self.perform(job_run, params)
      end
    end

    # working around lack of polymorphism in Ohm
    # using type attributed to get a typed instance
    def typed_self
      @typed_self ||= Object::const_get(self.type).new(id: self.id)
    end

  end

end