lib/qyu/models/job.rb
# frozen_string_literal: true
module Qyu
# Qyu::Job
class Job
attr_reader :descriptor, :payload, :workflow, :id, :created_at, :updated_at
## Class Methods
# Creates a new job via a workflow name / object and a payload
#
# @param [String, Qyu::Workflow] workflow to run
# @param [Hash] payload
# @return [Qyu::Job]
def self.create(workflow:, payload:)
workflow = Workflow.find_by(name: workflow) if workflow.is_a?(String)
id = persist(workflow, payload)
time = Time.now
new(id, workflow, payload, time, time)
end
# Finds Job by ID. Returns `nil` if job is not present in store
#
# @return [Qyu::Job, nil]
def self.find(id)
job_attrs = Qyu.store.find_job(id)
if job_attrs
new(id, job_attrs['workflow'], job_attrs['payload'],
job_attrs['created_at'], job_attrs['updated_at'])
end
end
def self.select(limit: 30, offset: 0, order: :asc)
job_records = Qyu.store.select_jobs(limit, offset, order)
job_records.map do |record|
new(record['id'], record['workflow'], record['payload'],
record['created_at'], record['updated_at'])
end
end
# Counts job in state store
#
# @return [Integer] jobs count
def self.count
Qyu.store.count_jobs
end
# Deletes job from state store by ID
#
# @param [Object] id
# @return [Object] deleted job
def self.delete(id)
Qyu.store.delete_job(id)
end
# Clears completed jobs
#
# @return [Integer] cleared jobs count
def self.clear_completed
Qyu.store.clear_completed_jobs
end
## Instance Methods
# Starts job execution
# Enqueues all tasks scheduled to start at the beginning (`starts` key in workflow descriptor)
#
# #=> job.start
def start
descriptor['starts'].each do |task_name|
create_task(nil, task_name, payload)
end
end
def queue_name(task_name)
descriptor['tasks'][task_name]['queue']
end
def next_task_names(src_task_name)
{
'without_params' => descriptor['tasks'][src_task_name]['starts'],
'with_params' => descriptor['tasks'][src_task_name]['starts_with_params']
}
end
def tasks_to_wait_for(task)
descriptor['tasks'][task.name]['waits_for'].keys
end
def sync_condition(task, next_task_name)
descriptor['tasks'][task.name]['waits_for'][next_task_name]['condition']
end
def create_task(parent_task, task_name, payload)
parent_task_id = parent_task.nil? ? nil : parent_task.id
Qyu.logger.debug "Task (ID=#{parent_task_id}) created a new task"
Qyu::Task.create(
queue_name: queue_name(task_name),
attributes: {
'name' => task_name,
'parent_task_id' => parent_task_id,
'job_id' => id,
'payload' => task_payload(payload, task_name)
})
end
def create_next_tasks(parent_task, payload)
Qyu.logger.debug "Creating next tasks for task (ID=#{parent_task.id})"
next_tasks = next_task_names(parent_task.name)
Qyu.logger.debug "Next task names: #{next_tasks}"
next_tasks['without_params']&.each do |next_task_name|
create_task(parent_task, next_task_name, payload)
end
next_tasks['with_params']&.each do |next_task_name, params|
updated_payload = payload.dup
params.each do |param_name, value_eqs|
f = value_eqs.keys[0]
x = value_eqs.values[0]
updated_payload[param_name] = calc_func_x(parent_task, f, x)
end
create_task(parent_task, next_task_name, updated_payload)
end
end
def find_task_ids_by_name(task_name)
Qyu.store.find_task_ids_by_job_id_and_name(id, task_name)
end
def find_task_ids_by_name_and_ancestor_task_id(task_name, ancestor_task_id)
ancestor_task_name = Qyu.store.find_task(ancestor_task_id)['name']
tasks_path = [task_name]
key_idx = 0
while tasks_path[-1] != ancestor_task_name
found_task = descriptor['tasks'].detect do |_, desc|
all_task_names = []
all_task_names.concat(desc['starts'] || [])
all_task_names.concat((desc['starts_with_params'] || {}).keys)
all_task_names.concat(desc['starts_parallel'] || [])
all_task_names.concat(desc['starts_manually'] || [])
all_task_names.include?(tasks_path[-1])
end
tasks_path << found_task[key_idx] if found_task
end
tasks_topdown_path = tasks_path.reverse
# remove topmost task (ancestor_task) from the path
tasks_topdown_path.shift
# traverse task tree from top down, and find the <task_name> "descendants" of <ancestor_task>
parent_task_ids = [ancestor_task_id]
tasks_topdown_path.each do |t_name|
parent_task_ids = Qyu.store.find_task_ids_by_job_id_name_and_parent_task_ids(id, t_name, parent_task_ids)
end
parent_task_ids
end
def task_status_counts
Qyu.store.task_status_counts(id)
end
def [](attribute)
public_send(attribute)
end
private_class_method :new
private
def initialize(id, workflow, payload, created_at = nil, updated_at = nil)
@workflow = Qyu::Workflow.new(workflow['id'], workflow['name'], workflow['descriptor'])
@descriptor = @workflow['descriptor']
@payload = payload
@id = id
@created_at = created_at
@updated_at = updated_at
end
def self.persist(workflow, payload)
workflow = Qyu::Workflow.find_by(name: workflow) if workflow.is_a?(String)
Qyu.store.persist_job(workflow, payload)
end
def calc_func_x(task, func, x)
if func == 'count'
find_task_ids_by_name_and_ancestor_task_id(x, task.id).count
else
fail Qyu::Errors::NotImplementedError
end
end
def task_payload(payload, task_name)
shared_payload = payload.dup.reject { |k, _v| task_name?(k) }
shared_payload.merge!(payload[task_name]) if payload[task_name].is_a?(Hash)
shared_payload
end
def task_name?(string)
descriptor['tasks'].keys.include?(string)
end
end
end