ruby-concurrency/concurrent-ruby

View on GitHub
lib/concurrent-ruby-edge/concurrent/actor/core.rb

Summary

Maintainability
A
3 hrs
Test Coverage
require 'set'
require 'concurrent/actor/type_check'
require 'concurrent/concern/logging'
require 'concurrent/executors'
require 'concurrent/synchronization/lockable_object'

module Concurrent
  module Actor
    # Core of the actor.
    # @note Whole class should be considered private. An user should use {Context}s and {Reference}s only.
    # @note devel: core should not block on anything, e.g. it cannot wait on children to terminate
    #   that would eat up all threads in task pool and deadlock
    class Core < Synchronization::LockableObject
      include TypeCheck
      include Concern::Logging

      # @!attribute [r] reference
      #   Reference to this actor which can be safely passed around.
      #   @return [Reference]
      # @!attribute [r] name
      #   The name of actor instance, it should be uniq (not enforced). Allows easier orientation
      #   between actor instances.
      #   @return [String]
      # @!attribute [r] path
      #   Path of this actor. It is used for easier orientation and logging.
      #   Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root},
      #   e.g. `/an_actor/its_child`.
      #   @return [String]
      # @!attribute [r] executor
      #   Executor which is used to process messages.
      #   @return [Executor]
      # @!attribute [r] actor_class
      #   A subclass of {AbstractContext} representing Actor's behaviour.
      #   @return [Context]
      attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

      # @option opts [String] name
      # @option opts [Context] actor_class a class to be instantiated defining Actor's behaviour
      # @option opts [Array<Object>] args arguments for actor_class instantiation
      # @option opts [Executor] executor, default is `global_io_executor`
      # @option opts [true, false] link, atomically link the actor to its parent (default: true)
      # @option opts [Class] reference a custom descendant of {Reference} to use
      # @option opts [Array<Array(Behavior::Abstract, Array<Object>)>] behaviour_definition, array of pairs
      #   where each pair is behaviour class and its args, see {Behaviour.basic_behaviour_definition}
      # @option opts [ResolvableFuture, nil] initialized, if present it'll be set or failed after {Context} initialization
      # @option opts [Reference, nil] parent **private api** parent of the actor (the one spawning )
      # @option opts [Proc, nil] logger a proc accepting (level, progname, message = nil, &block) params,
      #   can be used to hook actor instance to any logging system, see {Concurrent::Concern::Logging}
      # @param [Proc] block for class instantiation
      def initialize(opts = {}, &block)
        super(&nil)
        synchronize { ns_initialize(opts, &block) }
      end

      # A parent Actor. When actor is spawned the {Actor.current} becomes its parent.
      # When actor is spawned from a thread outside of an actor ({Actor.current} is nil) {Actor.root} is assigned.
      # @return [Reference, nil]
      def parent
        @parent_core && @parent_core.reference
      end

      # @see AbstractContext#dead_letter_routing
      def dead_letter_routing
        @context.dead_letter_routing
      end

      # @return [Array<Reference>] of children actors
      def children
        guard!
        @children.to_a
      end

      # @api private
      def add_child(child)
        guard!
        Type! child, Reference
        @children.add child
        nil
      end

      # @api private
      def remove_child(child)
        guard!
        Type! child, Reference
        @children.delete child
        nil
      end

      # is executed by Reference scheduling processing of new messages
      # can be called from other alternative Reference implementations
      # @param [Envelope] envelope
      def on_envelope(envelope)
        log(DEBUG) { "is  #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender}" }
        schedule_execution do
          log(DEBUG) { "was #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender} - processing" }
          process_envelope envelope
        end
        nil
      end

      # ensures that we are inside of the executor
      def guard!
        unless Actor.current == reference
          raise "can be called only inside actor #{reference} but was #{Actor.current}"
        end
      end

      def log(level, message = nil, &block)
        super level, @path, message, &block
      end

      # Schedules blocks to be executed on executor sequentially,
      # sets Actress.current
      def schedule_execution
        @serialized_execution.post(@executor) do
          synchronize do
            begin
              Thread.current[:__current_actor__] = reference
              yield
            rescue => e
              log FATAL, e
            ensure
              Thread.current[:__current_actor__] = nil
            end
          end
        end

        nil
      end

      def broadcast(public, event)
        log(DEBUG) { "event: #{event.inspect} (#{public ? 'public' : 'private'})" }
        @first_behaviour.on_event(public, event)
      end

      # @param [Class] behaviour_class
      # @return [Behaviour::Abstract, nil] based on behaviour_class
      def behaviour(behaviour_class)
        @behaviours[behaviour_class]
      end

      # @param [Class] behaviour_class
      # @return [Behaviour::Abstract] based on behaviour_class
      # @raise [KeyError] when no behaviour
      def behaviour!(behaviour_class)
        @behaviours.fetch behaviour_class
      end

      # @api private
      def allocate_context
        @context = @context_class.allocate
      end

      # @api private
      def build_context
        @context.send :initialize_core, self
        @context.send :initialize, *@args, &@block
      end

      # @api private
      def process_envelope(envelope)
        @first_behaviour.on_envelope envelope
      end

      private

      def ns_initialize(opts, &block)
        @mailbox              = ::Array.new
        @serialized_execution = SerializedExecution.new
        @children             = Set.new

        @context_class = Child! opts.fetch(:class), AbstractContext
        allocate_context

        @executor = Type! opts.fetch(:executor, @context.default_executor), Concurrent::AbstractExecutorService

        @reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
        @name      = (Type! opts.fetch(:name), String, Symbol).to_s

        parent       = opts[:parent]
        @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
        if @parent_core.nil? && @name != '/'
          raise 'only root has no parent'
        end

        @path   = @parent_core ? File.join(@parent_core.path, @name) : @name
        @logger = opts[:logger]

        @parent_core.add_child reference if @parent_core

        initialize_behaviours opts

        @args       = opts.fetch(:args, [])
        @block      = block
        initialized = Type! opts[:initialized], Promises::ResolvableFuture, NilClass

        schedule_execution do
          begin
            build_context
            initialized.fulfill reference if initialized
            log DEBUG, 'spawned'
          rescue => ex
            log ERROR, ex
            @first_behaviour.terminate!
            initialized.reject ex if initialized
          end
        end
      end

      def initialize_behaviours(opts)
        @behaviour_definition = (Type! opts[:behaviour_definition] || @context.behaviour_definition, ::Array).each do |(behaviour, _)|
          Child! behaviour, Behaviour::Abstract
        end
        @behaviours           = {}
        @first_behaviour      = @behaviour_definition.reverse.
            reduce(nil) { |last, (behaviour, *args)| @behaviours[behaviour] = behaviour.new(self, last, opts, *args) }
      end
    end
  end
end