Burgestrand/performer

View on GitHub
lib/performer/task.rb

Summary

Maintainability
A
1 hr
Test Coverage
require "timeout"

class Performer
  # A task is constructed with a callable object (like a proc), and provides ways to:
  #
  # - call the contained object, once and only once
  # - retrieve the value from the execution of the callable, and wait if execution is not finished
  # - cancel a task, and as such wake up all waiting for the task value
  #
  # Furthermore, the Task public API is thread-safe, and a resolved task will never change value.
  #
  # @example constructing a task
  #   task = Task.new(lambda { 1 + 1 })
  #   worker = Thread.new(task, &:call)
  #   task.value # => 2
  class Task
    # Used for the internal task state machine.
    #
    # @api private
    Transitions = {
      idle: { executing: true, cancelled: true },
      executing: { error: true, value: true },
    }

    # Allows easy capturing of all task-specific errors.
    class Error < Performer::Error; end

    # Raised in {Task#value} if the task was cancelled.
    class CancelledError < Error; end

    # Raised from {Task#call} or {Task#cancel} on invariant errors.
    class InvariantError < Error; end

    # Create a new Task from a callable object.
    #
    # @param [#call] callable
    # @param [Array<String>] backtrace
    def initialize(callable, backtrace = nil)
      @callable = callable
      @backtrace = backtrace

      @value_mutex = Mutex.new
      @value_cond = Performer::ConditionVariable.new

      @value = nil
      @value_type = :idle
    end

    # Execute the task. Arguments and block are passed on to the callable.
    #
    # @note A task can only be called once.
    # @note A task can not be called after it has been cancelled.
    # @note A task swallows standard errors during execution, but all other errors are propagated.
    #
    # When execution finishes, all waiting for {#value} will be woken up with the result.
    #
    # @return [Task] self
    def call(*args, &block)
      set(:executing) { nil }

      begin
        value = @callable.call(*args, &block)
      rescue Exception => ex
        slice_length = caller.length + 1
        ex.backtrace.slice!(-slice_length, slice_length)
        ex.backtrace[0] << " (task failed: #{ex.message} (#{ex.class}))" if @backtrace

        set(:error) { ex }
        raise ex unless ex.is_a?(StandardError)
      else
        set(:value) { value }
      end

      self
    end

    # Cancel the task. All waiting for {#value} will be woken up with a {CancelledError}.
    #
    # @note This cannot be done while the task is executing.
    # @note This cannot be done if the task has finished executing.
    #
    # @param [String] message for the cancellation error
    def cancel(message = "task was cancelled")
      backtrace = caller(0)
      backtrace[0] << " (Task#cancel: #{message})"

      set(:cancelled) do
        error = CancelledError.new(message)
        error.set_backtrace(backtrace)
        error
      end
    end

    # Retrieve the value of the task. If the task is not finished, this will block.
    #
    # @example waiting with a timeout and a block
    #   task.value(1) { raise MyOwnError, "Timed out after 1s" }
    #
    # @param [Integer, nil] timeout how long to wait for value before timing out, nil if wait forever
    # @yield if block given, yields instead of raising an error on timeout
    # @raise [TimeoutError] if waiting timeout was reached, and no block was given
    def value(timeout = nil)
      unless done?
        @value_mutex.synchronize do
          @value_cond.wait_until(@value_mutex, timeout) { done? }
        end
      end

      if value?
        return @value
      elsif error? or cancelled?
        raise prepare_backtrace(@value)
      elsif block_given?
        yield
      else
        raise TimeoutError, "retrieving value timed out after #{timeout}s"
      end
    end

    private

    def error?
      @value_type == :error
    end

    def value?
      @value_type == :value
    end

    def cancelled?
      @value_type == :cancelled
    end

    def done?
      value? or error? or cancelled?
    end

    def prepare_backtrace(error)
      error = error.dup

      value_backtrace = caller(1)

      if @backtrace
        error.backtrace.unshift(*value_backtrace)
        error.backtrace.concat(@backtrace)
      else
        error.backtrace.concat(value_backtrace)
      end

      error
    end

    # @param [Symbol] type
    # @yield to set the value
    def set(type)
      @value_mutex.synchronize do
        unless Transitions.fetch(@value_type, {}).has_key?(type)
          raise InvariantError, "transition from #{@value_type} to #{type} is not allowed"
        end

        @value_type = type
        @value = yield
        @value_cond.broadcast if done?

        @value
      end
    end
  end
end