tumugi/tumugi

View on GitHub
lib/tumugi/task.rb

Summary

Maintainability
B
5 hrs
Test Coverage
require 'tumugi/event'
require 'tumugi/logger/scoped_logger'
require 'tumugi/mixin/listable'
require 'tumugi/mixin/task_helper'
require 'tumugi/mixin/parameterizable'
require 'tumugi/mixin/human_readable'

module Tumugi
  class Task
    include Tumugi::Mixin::Parameterizable
    include Tumugi::Mixin::Listable
    include Tumugi::Mixin::TaskHelper
    include Tumugi::Mixin::HumanReadable

    attr_reader :visible_at, :tries, :max_retry, :retry_interval, :elapsed_time

    AVAILABLE_STATES = [
      :pending,
      :running,
      :completed,
      :skipped,
      :failed,
      :requires_failed,
    ]

    def initialize
      super()
      @visible_at = Time.now
      @tries = 0
      @max_retry = Tumugi.config.max_retry
      @retry_interval = Tumugi.config.retry_interval
      @state = :pending
      @lock = Mutex.new
      @_elapsed_times = []
      @elapsed_time = '00:00:00'
    end

    def id
      @id ||= self.class.name
    end

    def id=(s)
      @id = s
    end

    def eql?(other)
      self.hash == other.hash
    end

    def hash
      self.id.hash
    end

    def instance
      self
    end

    def logger
      @logger ||= Tumugi::ScopedLogger.new(->{"Thread-#{Thread.list.index {|t| t == Thread.current}}: #{id}"})
    end

    def log(msg)
      logger.info(msg)
    end

    # If you need to define task dependencies, override in subclass
    def requires
      []
    end

    def input
      @input ||= _input
    end

    # If you need to define output of task to skip alredy done task,
    # override in subclass. If not, a task run always.
    def output
      []
    end

    def run
      raise NotImplementedError, "You must implement #{self.class}##{__method__}"
    end

    def ready?
      list(_requires).all? { |t| t.instance.completed? }
    end

    def completed?
      outputs = list(output)
      if outputs.empty?
        success?
      else
        outputs.all?(&:exist?)
      end
    end

    def requires_failed?
      list(_requires).any? { |t| t.instance.finished? && !t.instance.success? }
    end

    def runnable?(now)
      ready? && visible?(now)
    end

    def ready?
      list(_requires).all? { |t| t.instance.completed? }
    end

    def completed?
      outputs = list(output)
      if outputs.empty?
        success?
      else
        outputs.all?(&:exist?)
      end
    end

    def requires_failed?
      list(_requires).any? { |t| t.instance.finished? && !t.instance.success? }
    end

    def runnable?(now)
      ready? && visible?(now)
    end

    def success?
      case state
      when :completed, :skipped
        true
      else
        false
      end
    end

    def failure?
      case state
      when :failed, :requires_failed
        true
      else
        false
      end
    end

    def finished?
      success? or failure?
    end

    def timeout
      nil # meaning use default timeout
    end

    def retry
      @tries += 1
      @visible_at += @retry_interval
      retriable?
    end

    def state
      @lock.synchronize { @state }
    end

    def trigger!(event)
      @lock.synchronize do
        now = Time.now
        @_elapsed_times[tries] ||= { start: now }

        s = case event
            when :skip
              :skipped
            when :start
              :running
            when :pend
              :pending
            when :complete
              :completed
            when :fail
              :failed
            when :requires_fail
              :requires_failed
            else
              raise Tumugi::TumugiError.new("Invalid event: #{event}")
            end

        if not AVAILABLE_STATES.include?(s)
          raise Tumugi::TumugiError.new("Invalid state: #{s}")
        end

        @_elapsed_times[tries][:end] = now
        @elapsed_time = human_readable_time((@_elapsed_times[tries][:end] - @_elapsed_times[tries][:start]).to_i)
        @state = s
      end
    end

    # Event callbacks
    Event.all.each do |event|
      class_eval <<-EOS
        def on_#{event}
        end
      EOS
    end

    # Following methods are internal use only

    def _requires
      @_requires ||= requires
    end

    def _output
      @_output ||= output
    end

    private

    def _input
      if _requires.nil?
        []
      elsif _requires.is_a?(Array)
        _requires.map { |t| t.instance._output }
      elsif _requires.is_a?(Hash)
        Hash[_requires.map { |k, t| [k, t.instance._output] }]
      else
        _requires.instance._output
      end
    end

    def visible?(now)
      now >= @visible_at
    end

    def retriable?
      @tries <= @max_retry
    end
  end
end