app/models/job.rb
# frozen_string_literal: true
require 'active_support/time'
require 'ice_cube_cron'
require 'chronic'
require 'trends'
#
# Job model
#
class Job < ApplicationRecord
acts_as_taggable
belongs_to :user
has_many :job_runs, dependent: :delete_all
validates :name, presence: true
validates :name, uniqueness: { scope: :user_id }
validates :cron, presence: true
validates :command, presence: true
scope :schedulable, -> { where('next_run < ? and enabled = ? and running = ?', Time.current.utc, true, false) }
scope :mine, ->(user_id) { where(user_id: user_id) }
scope :enabled, ->(enabled) { where(enabled: enabled) }
scope :running, ->(running) { where(running: running) }
scope :successful, ->(successful) { where(success: successful).where.not(last_run: nil) }
scope :named_like, ->(name) { where('name like ?', "%#{sanitize_sql_like(name)}%") }
scope :timezone, ->(timezone) { where(timezone: timezone) }
scope :command, ->(command) { where('command like ?', "%#{sanitize_sql_like(command)}%") }
scope :include_by_ids, ->(ids) { where('jobs.id in (?)', ids) }
scope :recent, ->(user_id, count) { mine(user_id).where.not(last_run: nil).order(last_run: :desc).limit(count) }
scope :upcoming, ->(user_id, count) { mine(user_id).where('next_run > ? and enabled = ? and running = ?', DateTime.now, true, false).order(next_run: :asc).limit(count) }
TAG_SEARCH = %w[any all exclude].freeze
before_save :update_cron, if: proc { |model| model.cron_changed? || model.timezone_changed? }
before_validation(on: %i[create update]) do
Cronex::ExpressionDescriptor.new(cron).description
rescue StandardError
errors.add(:cron, "'#{cron}' is an invalid cron expression.")
end
def update_cron
self.timezone = 'UTC' if timezone.blank?
update_next_run
rescue StandardError
raise TZInfo::InvalidTimezoneIdentifier
end
# Do the tagging
def tag(tags)
self.tag_list = '' if tags.blank?
user.tag self, with: tags, on: :tags unless tags.nil?
end
# Determines whether this job has a schedule ocurrence between 2 dates
def occurs_between(start_date, end_date)
schedule.occurs_between? start_date, end_date
end
# Get a list of ocurrences until end_date
def occurrences(end_date)
schedule.occurrences(end_date)
end
# When a job starts, this is called to mark the job as running and create a run
def start_job(is_manual)
run = JobRun.create(job: self, start_time: Time.current, scheduled_start_time: is_manual ? nil : next_run, is_manual: is_manual,
schedule_diff_in_seconds: [0, (Time.current.to_i - next_run.to_i)].max)
self.running = true
save
run
end
# When a job completes, this is called to update the job and the run
def stop_job(run, return_code, success, error_message, stdout, stderr)
self.success = success
run.update_run(return_code, success, error_message, stdout, stderr)
rescue StandardError
self.success = false
logger.error "Failed to update the job run for job #{id} : #{$!}"
ensure
self.last_run = Time.current
update_next_run
self.running = false
save
end
# Look for job runs that fit a set of criteria. This allows us to fetch runs
# that ran between dates, that succeeded, failed, etc.
def filter_runs(start_dt, end_dt, success)
start_dt = Chronic.parse(start_dt) unless start_dt.blank?
end_dt = Chronic.parse(end_dt) unless end_dt.blank?
query = job_runs
query = query.started_after(start_dt) unless start_dt.blank?
query = query.ended_before(end_dt) unless end_dt.blank?
query = query.succeeded(success) unless success.blank?
query.order(start_time: :desc)
end
#
# Take a string expression and convert it into something that find_jobs can deal with. This allows our smart folders
# to return jobs that match an expression. It also allows UI search components to filter jobs.
#
def self.search_jobs(user, expression)
tags = nil
enabled = nil
succeeded = nil
running = nil
name = nil
timezone = nil
command = nil
incexc = nil
occur_start = nil
occur_end = nil
if expression
pieces = expression.split(' ')
pieces.each do |piece|
prefix, search = piece.split(':')
case prefix
when 'name'
name = search
when 'command', 'cmd'
command = search
when 'timezone', 'tz'
timezone = search
when 'running', 'executing'
running = true
when 'stopped'
running = false
when 'enabled'
enabled = true
when 'disabled'
enabled = false
when 'succeeded', 'success', 'successful'
succeeded = true
when 'failed', 'fail', 'failing', 'broken'
succeeded = false
when 'start_date'
occur_start = search
when 'end_date'
occur_end = search
when 'tags', 'tag'
if search.start_with? '!'
incexc = 'exclude'
tags = search[1..]
elsif search.start_with? '&'
incexc = 'all'
tags = search[1..]
elsif search.start_with? '|'
incexc = 'any'
tags = search[1..]
else
incexc = 'any'
tags = search
end
end
end
end
find_jobs(user, occur_start, occur_end, tags, incexc, enabled, succeeded, running, name, timezone, command)
end
def self.find_jobs(user, occur_start, occur_end, tags, incexc, enabled, succeeded, running, name, timezone, command)
search = Job.mine(user)
search = search.enabled(enabled) unless enabled.nil?
search = search.running(running) unless running.nil?
search = search.successful(succeeded) unless succeeded.nil?
search = search.named_like(name) unless name.blank?
search = search.timezone(timezone) unless timezone.blank?
search = search.command(command) unless command.blank?
search = search.merge(Job.search_tags(search, tags, incexc)) unless tags.blank?
# Occurrence is a special case since we can't do it with an activerecord relation
if occur_start && occur_end
job_ids = []
search.find_each do |job|
job_ids << job.id if job.occurs_between(Chronic.parse(occur_start), Chronic.parse(occur_end))
end
search = job_ids.empty? ? Job.none : search.include_by_ids(job_ids)
end
search.order('next_run, id asc')
end
def self.search_tags(jobs, tags, incexc)
incexc = Job::TAG_SEARCH.include?(incexc) ? incexc.downcase : 'all'
tags = tags.split(',').map(&:strip) unless tags.nil?
case incexc
when 'all'
jobs = jobs.tagged_with(tags, match_all: true)
when 'any'
jobs = jobs.tagged_with(tags, any: true)
when 'exclude'
jobs = jobs.tagged_with(tags, exclude: true)
end
jobs
end
#
# Compute the run statistics for this job
#
def compute_run_stats
job_runtimes = JobRun.job_runtimes(id)
avg_runtime = JobRun.avg_runtime_for_job(id)
max_runtime = JobRun.max_runtime_for_job(id)
min_runtime = JobRun.min_runtime_for_job(id)
job_schedule_diffs = JobRun.job_schedule_diffs(id)
avg_lag = JobRun.avg_job_lag_for_job(id)
max_lag = JobRun.max_job_lag_for_job(id)
min_lag = JobRun.min_job_lag_for_job(id)
self.avg_run_duration_trend = job_runtimes.length < 2 ? 0 : job_runtimes.trend_line[:slope]
self.avg_run_duration = avg_runtime
self.max_run_duration = max_runtime
self.min_run_duration = min_runtime
self.avg_run_lag_trend = job_schedule_diffs.length < 2 ? 0 : job_schedule_diffs.trend_line[:slope]
self.avg_run_lag = avg_lag
self.max_run_lag = max_lag
self.min_run_lag = min_lag
save!
rescue StandardError
logger.error "Failed to compute run stats for job ID #{id}: #{$!}"
end
# Called when the job is rendered as JSON
def as_json(_options = {})
{
id: id,
name: name,
cron: cron,
enabled: enabled,
command: command,
last_run: last_run || '',
next_run: next_run,
running: running,
success: success,
created_at: created_at,
updated_at: updated_at,
timezone: timezone,
tags: tags.collect(&:name),
success_callback: success_callback || '',
failure_callback: failure_callback || '',
avg_run_duration: avg_run_duration,
max_run_duration: max_run_duration,
min_run_duration: min_run_duration,
avg_run_duration_trend: avg_run_duration_trend,
avg_run_lag: avg_run_lag,
max_run_lag: max_run_lag,
min_run_lag: min_run_lag,
avg_run_lag_trend: avg_run_lag_trend
}
end
private
def update_next_run
zone = Time.zone
Time.zone = timezone
self.next_run = Fugit::Cron.parse(cron).next_time.utc
Time.zone = zone
end
def schedule
date = DateTime.now.in_time_zone(timezone)
::IceCube::Schedule.from_cron(date, cron)
end
end