jdantonio/concurrent-ruby

View on GitHub
lib/concurrent-ruby/concurrent/async.rb

Summary

Maintainability
A
55 mins
Test Coverage
require 'concurrent/configuration'
require 'concurrent/ivar'
require 'concurrent/synchronization/lockable_object'

module Concurrent

  # A mixin module that provides simple asynchronous behavior to a class,
  # turning it into a simple actor. Loosely based on Erlang's
  # [gen_server](http://www.erlang.org/doc/man/gen_server.html), but without
  # supervision or linking.
  #
  # A more feature-rich {Concurrent::Actor} is also available when the
  # capabilities of `Async` are too limited.
  #
  # ```cucumber
  # Feature:
  #   As a stateful, plain old Ruby class
  #   I want safe, asynchronous behavior
  #   So my long-running methods don't block the main thread
  # ```
  #
  # The `Async` module is a way to mix simple yet powerful asynchronous
  # capabilities into any plain old Ruby object or class, turning each object
  # into a simple Actor. Method calls are processed on a background thread. The
  # caller is free to perform other actions while processing occurs in the
  # background.
  #
  # Method calls to the asynchronous object are made via two proxy methods:
  # `async` (alias `cast`) and `await` (alias `call`). These proxy methods post
  # the method call to the object's background thread and return a "future"
  # which will eventually contain the result of the method call.
  #
  # This behavior is loosely patterned after Erlang's `gen_server` behavior.
  # When an Erlang module implements the `gen_server` behavior it becomes
  # inherently asynchronous. The `start` or `start_link` function spawns a
  # process (similar to a thread but much more lightweight and efficient) and
  # returns the ID of the process. Using the process ID, other processes can
  # send messages to the `gen_server` via the `cast` and `call` methods. Unlike
  # Erlang's `gen_server`, however, `Async` classes do not support linking or
  # supervision trees.
  #
  # ## Basic Usage
  #
  # When this module is mixed into a class, objects of the class become inherently
  # asynchronous. Each object gets its own background thread on which to post
  # asynchronous method calls. Asynchronous method calls are executed in the
  # background one at a time in the order they are received.
  #
  # To create an asynchronous class, simply mix in the `Concurrent::Async` module:
  #
  # ```
  # class Hello
  #   include Concurrent::Async
  #
  #   def hello(name)
  #     "Hello, #{name}!"
  #   end
  # end
  # ```
  #
  # Mixing this module into a class provides each object two proxy methods:
  # `async` and `await`. These methods are thread safe with respect to the
  # enclosing object. The former proxy allows methods to be called
  # asynchronously by posting to the object's internal thread. The latter proxy
  # allows a method to be called synchronously but does so safely with respect
  # to any pending asynchronous method calls and ensures proper ordering. Both
  # methods return a {Concurrent::IVar} which can be inspected for the result
  # of the proxied method call. Calling a method with `async` will return a
  # `:pending` `IVar` whereas `await` will return a `:complete` `IVar`.
  #
  # ```
  # class Echo
  #   include Concurrent::Async
  #
  #   def echo(msg)
  #     print "#{msg}\n"
  #   end
  # end
  #
  # horn = Echo.new
  # horn.echo('zero')      # synchronous, not thread-safe
  #                        # returns the actual return value of the method
  #
  # horn.async.echo('one') # asynchronous, non-blocking, thread-safe
  #                        # returns an IVar in the :pending state
  #
  # horn.await.echo('two') # synchronous, blocking, thread-safe
  #                        # returns an IVar in the :complete state
  # ```
  #
  # ## Let It Fail
  #
  # The `async` and `await` proxy methods have built-in error protection based
  # on Erlang's famous "let it fail" philosophy. Instance methods should not be
  # programmed defensively. When an exception is raised by a delegated method
  # the proxy will rescue the exception, expose it to the caller as the `reason`
  # attribute of the returned future, then process the next method call.
  #
  # ## Calling Methods Internally
  #
  # External method calls should *always* use the `async` and `await` proxy
  # methods. When one method calls another method, the `async` proxy should
  # rarely be used and the `await` proxy should *never* be used.
  #
  # When an object calls one of its own methods using the `await` proxy the
  # second call will be enqueued *behind* the currently running method call.
  # Any attempt to wait on the result will fail as the second call will never
  # run until after the current call completes.
  #
  # Calling a method using the `await` proxy from within a method that was
  # itself called using `async` or `await` will irreversibly deadlock the
  # object. Do *not* do this, ever.
  #
  # ## Instance Variables and Attribute Accessors
  #
  # Instance variables do not need to be thread-safe so long as they are private.
  # Asynchronous method calls are processed in the order they are received and
  # are processed one at a time. Therefore private instance variables can only
  # be accessed by one thread at a time. This is inherently thread-safe.
  #
  # When using private instance variables within asynchronous methods, the best
  # practice is to read the instance variable into a local variable at the start
  # of the method then update the instance variable at the *end* of the method.
  # This way, should an exception be raised during method execution the internal
  # state of the object will not have been changed.
  #
  # ### Reader Attributes
  #
  # The use of `attr_reader` is discouraged. Internal state exposed externally,
  # when necessary, should be done through accessor methods. The instance
  # variables exposed by these methods *must* be thread-safe, or they must be
  # called using the `async` and `await` proxy methods. These two approaches are
  # subtly different.
  #
  # When internal state is accessed via the `async` and `await` proxy methods,
  # the returned value represents the object's state *at the time the call is
  # processed*, which may *not* be the state of the object at the time the call
  # is made.
  #
  # To get the state *at the current* time, irrespective of an enqueued method
  # calls, a reader method must be called directly. This is inherently unsafe
  # unless the instance variable is itself thread-safe, preferably using one
  # of the thread-safe classes within this library. Because the thread-safe
  # classes within this library are internally-locking or non-locking, they can
  # be safely used from within asynchronous methods without causing deadlocks.
  #
  # Generally speaking, the best practice is to *not* expose internal state via
  # reader methods. The best practice is to simply use the method's return value.
  #
  # ### Writer Attributes
  #
  # Writer attributes should never be used with asynchronous classes. Changing
  # the state externally, even when done in the thread-safe way, is not logically
  # consistent. Changes to state need to be timed with respect to all asynchronous
  # method calls which my be in-process or enqueued. The only safe practice is to
  # pass all necessary data to each method as arguments and let the method update
  # the internal state as necessary.
  #
  # ## Class Constants, Variables, and Methods
  #
  # ### Class Constants
  #
  # Class constants do not need to be thread-safe. Since they are read-only and
  # immutable they may be safely read both externally and from within
  # asynchronous methods.
  #
  # ### Class Variables
  #
  # Class variables should be avoided. Class variables represent shared state.
  # Shared state is anathema to concurrency. Should there be a need to share
  # state using class variables they *must* be thread-safe, preferably
  # using the thread-safe classes within this library. When updating class
  # variables, never assign a new value/object to the variable itself. Assignment
  # is not thread-safe in Ruby. Instead, use the thread-safe update functions
  # of the variable itself to change the value.
  #
  # The best practice is to *never* use class variables with `Async` classes.
  #
  # ### Class Methods
  #
  # Class methods which are pure functions are safe. Class methods which modify
  # class variables should be avoided, for all the reasons listed above.
  #
  # ## An Important Note About Thread Safe Guarantees
  #
  # > Thread safe guarantees can only be made when asynchronous method calls
  # > are not mixed with direct method calls. Use only direct method calls
  # > when the object is used exclusively on a single thread. Use only
  # > `async` and `await` when the object is shared between threads. Once you
  # > call a method using `async` or `await`, you should no longer call methods
  # > directly on the object. Use `async` and `await` exclusively from then on.
  #
  # @example
  #
  #   class Echo
  #     include Concurrent::Async
  #
  #     def echo(msg)
  #       print "#{msg}\n"
  #     end
  #   end
  #
  #   horn = Echo.new
  #   horn.echo('zero')      # synchronous, not thread-safe
  #                          # returns the actual return value of the method
  #
  #   horn.async.echo('one') # asynchronous, non-blocking, thread-safe
  #                          # returns an IVar in the :pending state
  #
  #   horn.await.echo('two') # synchronous, blocking, thread-safe
  #                          # returns an IVar in the :complete state
  #
  # @see Concurrent::Actor
  # @see https://en.wikipedia.org/wiki/Actor_model "Actor Model" at Wikipedia
  # @see http://www.erlang.org/doc/man/gen_server.html Erlang gen_server
  # @see http://c2.com/cgi/wiki?LetItCrash "Let It Crash" at http://c2.com/
  module Async

    # @!method self.new(*args, &block)
    #
    #   Instanciate a new object and ensure proper initialization of the
    #   synchronization mechanisms.
    #
    #   @param [Array<Object>] args Zero or more arguments to be passed to the
    #     object's initializer.
    #   @param [Proc] block Optional block to pass to the object's initializer.
    #   @return [Object] A properly initialized object of the asynchronous class.

    # Check for the presence of a method on an object and determine if a given
    # set of arguments matches the required arity.
    #
    # @param [Object] obj the object to check against
    # @param [Symbol] method the method to check the object for
    # @param [Array] args zero or more arguments for the arity check
    #
    # @raise [NameError] the object does not respond to `method` method
    # @raise [ArgumentError] the given `args` do not match the arity of `method`
    #
    # @note This check is imperfect because of the way Ruby reports the arity of
    #   methods with a variable number of arguments. It is possible to determine
    #   if too few arguments are given but impossible to determine if too many
    #   arguments are given. This check may also fail to recognize dynamic behavior
    #   of the object, such as methods simulated with `method_missing`.
    #
    # @see http://www.ruby-doc.org/core-2.1.1/Method.html#method-i-arity Method#arity
    # @see http://ruby-doc.org/core-2.1.0/Object.html#method-i-respond_to-3F Object#respond_to?
    # @see http://www.ruby-doc.org/core-2.1.0/BasicObject.html#method-i-method_missing BasicObject#method_missing
    #
    # @!visibility private
    def self.validate_argc(obj, method, *args)
      argc = args.length
      arity = obj.method(method).arity

      if arity >= 0 && argc != arity
        raise ArgumentError.new("wrong number of arguments (#{argc} for #{arity})")
      elsif arity < 0 && (arity = (arity + 1).abs) > argc
        raise ArgumentError.new("wrong number of arguments (#{argc} for #{arity}..*)")
      end
    end

    # @!visibility private
    def self.included(base)
      base.singleton_class.send(:alias_method, :original_new, :new)
      base.extend(ClassMethods)
      super(base)
    end

    # @!visibility private
    module ClassMethods
      def new(*args, &block)
        obj = original_new(*args, &block)
        obj.send(:init_synchronization)
        obj
      end
      ruby2_keywords :new if respond_to?(:ruby2_keywords, true)
    end
    private_constant :ClassMethods

    # Delegates asynchronous, thread-safe method calls to the wrapped object.
    #
    # @!visibility private
    class AsyncDelegator < Synchronization::LockableObject
      safe_initialization!

      # Create a new delegator object wrapping the given delegate.
      #
      # @param [Object] delegate the object to wrap and delegate method calls to
      def initialize(delegate)
        super()
        @delegate = delegate
        @queue = []
        @executor = Concurrent.global_io_executor
        @ruby_pid = $$
      end

      # Delegates method calls to the wrapped object.
      #
      # @param [Symbol] method the method being called
      # @param [Array] args zero or more arguments to the method
      #
      # @return [IVar] the result of the method call
      #
      # @raise [NameError] the object does not respond to `method` method
      # @raise [ArgumentError] the given `args` do not match the arity of `method`
      def method_missing(method, *args, &block)
        super unless @delegate.respond_to?(method)
        Async::validate_argc(@delegate, method, *args)

        ivar = Concurrent::IVar.new
        synchronize do
          reset_if_forked
          @queue.push [ivar, method, args, block]
          @executor.post { perform } if @queue.length == 1
        end

        ivar
      end

      # Check whether the method is responsive
      #
      # @param [Symbol] method the method being called
      def respond_to_missing?(method, include_private = false)
        @delegate.respond_to?(method) || super
      end

      # Perform all enqueued tasks.
      #
      # This method must be called from within the executor. It must not be
      # called while already running. It will loop until the queue is empty.
      def perform
        loop do
          ivar, method, args, block = synchronize { @queue.first }
          break unless ivar # queue is empty

          begin
            ivar.set(@delegate.send(method, *args, &block))
          rescue => error
            ivar.fail(error)
          end

          synchronize do
            @queue.shift
            return if @queue.empty?
          end
        end
      end

      def reset_if_forked
        if $$ != @ruby_pid
          @queue.clear
          @ruby_pid = $$
        end
      end
    end
    private_constant :AsyncDelegator

    # Delegates synchronous, thread-safe method calls to the wrapped object.
    #
    # @!visibility private
    class AwaitDelegator

      # Create a new delegator object wrapping the given delegate.
      #
      # @param [AsyncDelegator] delegate the object to wrap and delegate method calls to
      def initialize(delegate)
        @delegate = delegate
      end

      # Delegates method calls to the wrapped object.
      #
      # @param [Symbol] method the method being called
      # @param [Array] args zero or more arguments to the method
      #
      # @return [IVar] the result of the method call
      #
      # @raise [NameError] the object does not respond to `method` method
      # @raise [ArgumentError] the given `args` do not match the arity of `method`
      def method_missing(method, *args, &block)
        ivar = @delegate.send(method, *args, &block)
        ivar.wait
        ivar
      end

      # Check whether the method is responsive
      #
      # @param [Symbol] method the method being called
      def respond_to_missing?(method, include_private = false)
        @delegate.respond_to?(method) || super
      end
    end
    private_constant :AwaitDelegator

    # Causes the chained method call to be performed asynchronously on the
    # object's thread. The delegated method will return a future in the
    # `:pending` state and the method call will have been scheduled on the
    # object's thread. The final disposition of the method call can be obtained
    # by inspecting the returned future.
    #
    # @!macro async_thread_safety_warning
    #   @note The method call is guaranteed to be thread safe with respect to
    #     all other method calls against the same object that are called with
    #     either `async` or `await`. The mutable nature of Ruby references
    #     (and object orientation in general) prevent any other thread safety
    #     guarantees. Do NOT mix direct method calls with delegated method calls.
    #     Use *only* delegated method calls when sharing the object between threads.
    #
    # @return [Concurrent::IVar] the pending result of the asynchronous operation
    #
    # @raise [NameError] the object does not respond to the requested method
    # @raise [ArgumentError] the given `args` do not match the arity of
    #   the requested method
    def async
      @__async_delegator__
    end
    alias_method :cast, :async

    # Causes the chained method call to be performed synchronously on the
    # current thread. The delegated will return a future in either the
    # `:fulfilled` or `:rejected` state and the delegated method will have
    # completed. The final disposition of the delegated method can be obtained
    # by inspecting the returned future.
    #
    # @!macro async_thread_safety_warning
    #
    # @return [Concurrent::IVar] the completed result of the synchronous operation
    #
    # @raise [NameError] the object does not respond to the requested method
    # @raise [ArgumentError] the given `args` do not match the arity of the
    #   requested method
    def await
      @__await_delegator__
    end
    alias_method :call, :await

    # Initialize the internal serializer and other stnchronization mechanisms.
    #
    # @note This method *must* be called immediately upon object construction.
    #   This is the only way thread-safe initialization can be guaranteed.
    #
    # @!visibility private
    def init_synchronization
      return self if defined?(@__async_initialized__) && @__async_initialized__
      @__async_initialized__ = true
      @__async_delegator__ = AsyncDelegator.new(self)
      @__await_delegator__ = AwaitDelegator.new(@__async_delegator__)
      self
    end
  end
end