celluloid/celluloid

View on GitHub
lib/celluloid/future.rb

Summary

Maintainability
A
3 hrs
Test Coverage
module Celluloid
  # Celluloid::Future objects allow methods and blocks to run in the
  # background, their values requested later
  class Future
    def self.new(*args, &block)
      return super unless block

      future = new
      # task = Thread.current[:celluloid_task]
      # actor = Thread.current[:celluloid_actor]
      Internals::ThreadHandle.new(Celluloid.actor_system, :future) do
        begin
          # Thread.current[:celluloid_task] = task
          # Thread.current[:celluloid_actor] = actor
          call = Call::Sync.new(future, :call, args)
          call.dispatch(block)
        rescue
          # Exceptions in blocks will get raised when the value is retrieved
        end
      end
      future
    end

    attr_reader :address

    def initialize(&block)
      @address = Celluloid.uuid
      @mutex = Mutex.new
      @ready = false
      @result = nil
      @forwards = nil
      @cancelled = false

      if block
        @call = Call::Sync.new(self, :call, args)
        Celluloid.internal_pool.get do
          begin
            @call.dispatch(block)
          rescue
            # Exceptions in blocks will get raised when the value is retrieved
          end
        end
      else
        @call = nil
      end
    end

    # Execute the given method in future context
    def execute(receiver, method, args, block)
      @mutex.synchronize do
        raise "already calling" if @call
        @call = Call::Sync.new(self, method, args, block)
      end

      receiver << @call
    end

    # Check if this future has a value yet
    def ready?
      @ready
    end

    # Obtain the value for this Future
    def value(timeout = nil)
      ready = result = nil

      begin
        @mutex.lock

        if @ready
          ready = true
          result = @result
        else
          case @forwards
          when Array
            @forwards << Celluloid.mailbox
          when NilClass
            @forwards = Celluloid.mailbox
          else
            @forwards = [@forwards, Celluloid.mailbox]
          end
        end
      ensure
        @mutex.unlock
      end

      unless ready
        result = Celluloid.receive(timeout) do |msg|
          msg.is_a?(Future::Result) && msg.future == self
        end
      end

      if result
        result.respond_to?(:value) ? result.value : result
      else
        raise TimedOut, "Timed out"
      end
    end
    alias call value

    # Signal this future with the given result value
    def signal(value)
      return if @cancelled
      result = Result.new(value, self)

      @mutex.synchronize do
        raise "the future has already happened!" if @ready

        if @forwards
          @forwards.is_a?(Array) ? @forwards.each { |f| f << result } : @forwards << result
        end

        @result = result
        @ready = true
      end
    end
    alias << signal

    def cancel(error)
      response = Internals::Response::Error.new(@call, error)
      signal response
      @mutex.synchronize do
        @cancelled = true
      end
    end

    # Inspect this Celluloid::Future
    alias inspect to_s

    # Wrapper for result values to distinguish them in mailboxes
    class Result
      attr_reader :future

      def initialize(result, future)
        @result = result
        @future = future
      end

      def value
        @result.value
      end
    end
  end
end