celluloid/celluloid

View on GitHub
lib/celluloid/task.rb

Summary

Maintainability
A
3 hrs
Test Coverage
module Celluloid
  # Tasks are interruptable/resumable execution contexts used to run methods
  class Task
    # Obtain the current task
    def self.current
      Thread.current[:celluloid_task] || raise(NotTaskError, "not within a task context")
    end

    # Suspend the running task, deferring to the scheduler
    def self.suspend(status)
      Task.current.suspend(status)
    end

    attr_reader :type, :meta, :status
    attr_accessor :chain_id, :guard_warnings

    # Create a new task
    def initialize(type, meta)
      @type     = type
      @meta     = meta
      @status   = :new

      @exclusive         = false
      @dangerous_suspend = @meta ? @meta.dup.delete(:dangerous_suspend) : false
      @guard_warnings    = false

      actor     = Thread.current[:celluloid_actor]
      @chain_id = Internals::CallChain.current_id

      raise NotActorError, "can't create tasks outside of actors" unless actor
      guard "can't create tasks inside of tasks" if Thread.current[:celluloid_task]

      create do
        begin
          @status = :running
          actor.setup_thread

          name_current_thread thread_metadata

          Thread.current[:celluloid_task] = self
          Internals::CallChain.current_id = @chain_id

          actor.tasks << self
          yield
        rescue TaskTerminated
          # Task was explicitly terminated
        ensure
          name_current_thread nil
          @status = :dead
          actor.tasks.delete self
        end
      end
    end

    def create(&_block)
      raise "Implement #{self.class}#create"
    end

    # Suspend the current task, changing the status to the given argument
    def suspend(status)
      raise "Cannot suspend while in exclusive mode" if exclusive?
      raise "Cannot suspend a task from outside of itself" unless Task.current == self

      @status = status

      if Internals::Logger.level == Logger::DEBUG && @dangerous_suspend
        Internals::Logger.with_backtrace(caller[2...8]) do |logger|
          logger.warn "Dangerously suspending task: type=#{@type.inspect}, meta=#{@meta.inspect}, status=#{@status.inspect}"
        end
      end

      value = signal

      @status = :running
      raise value if value.is_a?(Celluloid::Interruption)
      value
    end

    # Resume a suspended task, giving it a value to return if needed
    def resume(value = nil)
      guard "Cannot resume a task from inside of a task" if Thread.current[:celluloid_task]
      if running?
        deliver(value)
      else
        # rubocop:disable Metrics/LineLength
        Internals::Logger.warn "Attempted to resume a dead task: type=#{@type.inspect}, meta=#{@meta.inspect}, status=#{@status.inspect}"
        # rubocop:enable Metrics/LineLength
      end
      nil
    end

    # Execute a code block in exclusive mode.
    def exclusive
      if @exclusive
        yield
      else
        begin
          @exclusive = true
          yield
        ensure
          @exclusive = false
        end
      end
    end

    # Terminate this task
    def terminate
      raise "Cannot terminate an exclusive task" if exclusive?

      if running?
        if Internals::Logger.level == Logger::DEBUG
          Internals::Logger.with_backtrace(backtrace) do |logger|
            type = @dangerous_suspend ? :warn : :debug
            logger.send(type, "Terminating task: type=#{@type.inspect}, meta=#{@meta.inspect}, status=#{@status.inspect}")
          end
        end

        exception = TaskTerminated.new("task was terminated")
        exception.set_backtrace(caller)
        resume exception
      else
        raise DeadTaskError, "task is already dead"
      end
    end

    # Is this task running in exclusive mode?
    def exclusive?
      @exclusive
    end

    def backtrace; end

    # Is the current task still running?
    def running?
      @status != :dead
    end

    # Nicer string inspect for tasks
    def inspect
      "#<#{self.class}:0x#{object_id.to_s(16)} @type=#{@type.inspect}, @meta=#{@meta.inspect}, @status=#{@status.inspect}>"
    end

    def guard(message)
      if @guard_warnings
        Internals::Logger.warn message if Internals::Logger.level == Logger::DEBUG
      else
        raise message if Internals::Logger.level == Logger::DEBUG
      end
    end

    private

    def name_current_thread(new_name)
      return unless RUBY_PLATFORM == "java"
      current_java_thread = java.lang.Thread.currentThread
      if new_name.nil?
        new_name = Thread.current[:celluloid_original_thread_name]
        Thread.current[:celluloid_original_thread_name] = nil
      else
        Thread.current[:celluloid_original_thread_name] = current_java_thread.get_name
      end
      current_java_thread.set_name(new_name)
    end

    def thread_metadata
      method = @meta && @meta[:method_name] || "<no method>"
      klass = Thread.current[:celluloid_actor] &&
              Thread.current[:celluloid_actor].behavior.subject.bare_object.class ||
              "<no actor>"
      format("[Celluloid] %s#%s", klass, method)
    end
  end
end