openjaf/cenit

View on GitHub
app/models/setup/task.rb

Summary

Maintainability
D
3 days
Test Coverage
module Setup
  class Task
    include CenitScoped
    include ClassHierarchyAware
    include CrossOrigin::CenitDocument
    include FieldsInspection

    origins :default, -> { ::User.super_access? ? :admin : nil }

    STATUS = [:pending, :running, :failed, :completed, :retrying, :broken, :unscheduled, :paused]
    ACTIVE_STATUS = [:running, :retrying]
    NON_ACTIVE_STATUS = STATUS - ACTIVE_STATUS
    RUNNING_STATUS = ACTIVE_STATUS + [:paused]
    ALIVE_STATUS = RUNNING_STATUS + [:pending]
    NOT_RUNNING_STATUS = STATUS - RUNNING_STATUS
    FINISHED_STATUS = NOT_RUNNING_STATUS - [:pending]

    build_in_data_type.excluding(:thread_token).and(
      label: '{{description}}',
      properties: {
        status: {
          enum: STATUS.collect(&:to_s)
        }
      }
    )

    deny :create, :update

    field :message, type: Hash, default: {}
    field :description, type: String
    field :status, type: StringifiedSymbol, default: :pending
    field :progress, type: Float, default: 0
    field :attempts, type: Integer, default: 0
    field :succeded, type: Integer, default: 0
    field :retries, type: Integer, default: 0
    field :state, type: Hash, default: {}
    field :auto_retry, type: StringifiedSymbol, default: -> { auto_retry_enum.first }
    field :resumes, type: Integer, default: 0

    belongs_to :current_execution, class_name: Setup::Execution.to_s, inverse_of: nil
    has_many :executions, class_name: Setup::Execution.to_s, inverse_of: :task, dependent: :destroy

    has_many :notifications, class_name: Setup::SystemNotification.to_s, inverse_of: :task, dependent: :destroy

    belongs_to :thread_token, class_name: ThreadToken.to_s, inverse_of: nil
    belongs_to :scheduler, class_name: Setup::Scheduler.to_s, inverse_of: nil

    has_and_belongs_to_many :joining_tasks, class_name: Setup::Task.to_s, inverse_of: nil

    inspect_fields :progress, :description, :state

    validates_inclusion_of :status, in: ->(t) { t.status_enum }
    validates_numericality_of :progress, greater_than_or_equal_to: 0, less_than_or_equal_to: 100
    validates_presence_of :auto_retry

    before_save do
      message.delete(:task)
      self.description = auto_description if description.blank?
      check_scheduler(scheduler, :to_errors)
      self.progress = progress.round(1)
      abort_if_has_errors
    end

    before_destroy { NON_ACTIVE_STATUS.include?(status) && (scheduler.nil? || scheduler.deactivated?) }

    def check_scheduler(scheduler, report = :none)
      if scheduler && scheduler.origin != origin
        error = "with incompatible origin (#{scheduler.origin}), #{origin} origin is expected"
        case report
          when :to_errors
            errors.add(:scheduler, error)
          when :exception
            fail "Scheduler #{error}"
          else
            # Nothing to do here
        end
        false
      else
        true
      end
    end

    def save(options = {})
      options[:inspect_fields] = thread_token.present? && Thread.current[:task_token] == thread_token.token
      super
    end

    def _type_enum
      classes = Setup::Task.class_hierarchy
      classes.delete(Setup::Task)
      classes.delete(::ScriptExecution)
      classes.collect(&:to_s)
    end

    def auto_retry_enum
      self.class.auto_retry_enum
    end

    def auto_description
      to_s
    end

    def to_s
      "#{self.class.to_s.split('::').last.to_title} ##{id}"
    end

    def status_enum
      STATUS
    end

    def attempts_succeded
      "#{attempts}/#{succeded}"
    end

    def running_status?
      RUNNING_STATUS.include?(status)
    end

    def running?
      running_status? &&
        thread_token.present? &&
        Thread.list.any? { |thread| thread[:task_token] == thread_token.token }
    end

    def new_execution
      self.current_execution = Setup::Execution.create(task: self)
      save
      current_execution
    end

    def queue_execution
      if current_execution&.status == :pending
        current_execution
      else
        new_execution
      end
    end

    def maximum_resumes
      Cenit.maximum_task_resumes
    end

    def execute(options = {})
      task_desc = description.presence || "Task ##{id}"
      if running? || !Cenit::Locker.lock(self)
        notify(message: "Executing task ##{id} at #{Time.now} but it is already running")
      else
        thread_token.destroy if thread_token.present?
        self.thread_token = ThreadToken.create
        self.retries += 1 if status == :retrying || status == :failed
        self.current_execution =
          begin
            Setup::Execution.find(options[:execution_id])
          rescue
            queue_execution
          end
        time = Time.now
        if running_status?
          self.resumes += 1
          fail Broken, "Maximum task resumes exceeded (#{resumes})" if resumes > maximum_resumes
          notify(message: "Restarting #{task_desc} at #{time}", type: :notice)
        else
          self.attempts += 1
          self.progress = 0
          self.status = :running
          self.resumes = 0
          notify(type: :info, message: "#{task_desc} started at #{time}")
        end
        Thread.current[:task_token] = thread_token.token
        current_execution.start(time: time)
        before_run_ex = nil
        do_run =
          begin
            before_run
          rescue ::Exception => ex
            before_run_ex = ex
            false
          end
        if do_run
          begin
            run(message)
          ensure
            Task.execution_done(self)
          end
          time = Time.now
          if resuming_later?
            finish(:paused, "#{task_desc} paused at #{time}", :notice, time)
          else
            self.state = {}
            self.progress = 100
            finish(:completed, "#{task_desc} completed at #{time}", :info, time)
          end
        else
          if before_run_ex
            finish(:failed, before_run_ex.message, :error, time)
          else
            finish(:failed, "#{task_desc} wasn't executed!", :warning, time)
          end
        end
      end
    rescue ::Exception => ex
      time = Time.now
      if ex.is_a?(Task::Exception)
        finish(ex.status, ex.message, ex.message_type, time)
      else
        @finish_attachment =
          {
            filename: 'backtrace.txt',
            contentType: 'plain/text',
            body: "#{ex.message}\n\n#{ex.backtrace.join("\n")}"
          }
        finish(:failed, "[#{ex.class}] #{ex.message&.capitalize}", :error, time)
      end
    ensure
      reload
      if joining_tasks.present?
        joining_tasks.each(&:retry)
        joining_tasks.nullify
      end
      Cenit::Locker.unlock(self)
    end

    def run(_message)
      fail NotImplementedError
    end

    def before_run
      Cenit::MultiTenancy.current_tenant.check_enabled!
    end

    protected :before_run

    def break(message = nil)
      raise Broken.new(message)
    end

    def unschedule
      task_desc = description.presence || "Task ##{id}"
      finish(:unscheduled, "#{task_desc} unscheduled at #{time = Time.now}", :warning, time)
    end

    def notify(attrs_or_exception)
      notification =
        case attrs_or_exception
          when Hash
            Setup::SystemNotification.create_with(attrs_or_exception)
          when Exception, StandardError
            Setup::SystemNotification.create_from(attrs_or_exception)
          else
            nil
        end
      if notification
        notifications << notification
        if current_execution
          current_execution.notifications << notification
        end
      end
      save
    end

    def can_retry?
      !running?
    end

    def can_schedule?
      can_retry?
    end

    def schedule(scheduler, report = :none)
      if can_schedule? && check_scheduler(scheduler, report)
        self.scheduler = scheduler
        self.retry(action: 'scheduled')
      end
    end

    def retry(options = {})
      if can_retry?
        self.status = (status == :failed ? :retrying : :pending)
        task_desc = description.presence || "Task ##{id}"
        notify(type: :notice, message: "#{task_desc} #{options[:action] || 'executed'} at #{Time.now}")
        Cenit::Rabbit.enqueue(message.merge(task: self))
      end
    end

    attr_reader :finish_attachment

    def resuming_manually?
      @resuming_manually
    end

    def resume_manually
      resume_later
      @resuming_manually = true
    end

    def resuming_later?
      @resuming_later
    end

    def resume_later
      fail 'Resume later is already invoked for these task' if @resuming_later
      @resuming_later = true
    end

    def resume_in(interval)
      resume_later
      @resume_in =
        if interval.is_a?(Integer)
          interval
        else
          interval.to_s.to_seconds_interval
        end
    end

    def run_again
      resume_in(0)
    end

    def resume_interval
      @resume_in
    end

    def join(task)
      task.joining_tasks << self
      @joining = true
    end

    def joining?
      @joining.to_b
    end

    def agent_id
      (agent = self.agent) && agent.id
    end

    def agent
      send(self.class.agent_field || :itself)
    end

    def agent_model
      self.class.agent_model
    end

    def agent_from_msg
      @agent_from_msg ||= self.class.agent_from(message)
    end

    def write_attribute(name, value)
      if name.to_sym == self.class.agent_field
        @agent_from_msg = nil
      end
      super
    end

    class << self

      def current
        (thread_token = ThreadToken.where(token: Thread.current[:task_token]).first) &&
          Setup::Task.where(thread_token: thread_token).first
      end

      def break(message)
        if current
          raise Broken.new(message)
        else
          Tenant.notify(message: "Calling break outside task execution context (msg: #{message})", type: :warning)
        end
      end

      def auto_retry_enum
        %w(manually automatic).collect(&:to_sym)
      end

      def process(message = {}, &block)
        message[:task] = self unless (task = message[:task]).is_a?(self) || (task.is_a?(Class) && task < self)
        Cenit::Rabbit.enqueue(message, &block)
      end

      def agent_field(*args)
        if args.length.positive?
          @agent_field = args[0]
          if (@agent_id_msg_key = args[1])
            before_save do
              self.send("#{self.class.agent_field}=", agent_from_msg)
            end
          end
        else
          @agent_field || superclass.try(:agent_field)
        end
      end

      def agent_id_msg_key
        @agent_id_msg_key || superclass.try(:agent_id_msg_key)
      end

      def agent_from(msg)
        (key = agent_id_msg_key) &&
          agent_model.where(id: msg[key]).first
      end

      def agent_model
        if (field = agent_field)
          reflect_on_association(field).klass
        else
          self
        end
      end

      def on_executed(&block)
        (@on_executed ||= []) << block if block
      end

      def execution_done(task)
        if @on_executed
          @on_executed.each do |callback|
            begin
              callback.call(task)
            rescue Exception => ex
              SystemReport.create_from(ex, 'Task execution callback')
            end
          end
        end
      end
    end

    class Exception < ::Exception

      def initialize(msg, options = {})
        super(msg)
        @options = options || {}
      end

      def status
        @options[:status] || :failed
      end

      def message_type
        @options[:message_type] || :error
      end
    end

    class Broken < Exception
      def initialize(msg)
        super(msg, status: :broken, message_type: :warning)
      end
    end

    private

    def clear_resume
      @resuming_later = false
      self.resumes = 0
    end

    def finish(status, message, message_type, time)
      self.status = status
      thread_token.destroy if thread_token.present?
      self.thread_token = nil
      Thread.current[:task_token] = nil
      if status == :completed
        self.succeded += 1
        self.retries = 0
      elsif status == :broken
        clear_resume
      elsif status == :failed
        clear_resume
        if auto_retry == :automatic
          resume_in case retries
                      when 0
                        '5s'
                      when 1
                        '1m'
                      when 2
                        '3m'
                      when 3
                        '5m'
                      when 4
                        '10m'
                      when 5
                        '30m'
                      when 6
                        '1h'
                      else
                        '1d'
                    end
        end
      end
      notify(type: message_type, message: message, attachment: finish_attachment)
      if current_execution
        current_execution.finish(status: status, time: time)
        self.current_execution = nil
      end
    end

  end
end