jdantonio/concurrent-ruby

View on GitHub
lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb

Summary

Maintainability
F
6 days
Test Coverage
require 'set'
require 'concurrent/atomic/count_down_latch'
require 'concurrent/concern/logging'
require 'concurrent/edge/channel'
require 'concurrent/errors'
require 'concurrent/promises'
require 'concurrent/synchronization/object'

module Concurrent

  # This module provides actor abstraction that has same behaviour as Erlang actor.
  #
  # {include:file:docs-source/erlang_actor.out.md}
  # @!macro warn.edge
  module ErlangActor

    # TODO (pitr-ch 04-Feb-2019): mode documentation.
    # TODO (pitr-ch 21-Jan-2019): actor on promises should not call blocking calls like mailbox.pop or tell
    # it's fine for a actor on thread and event based though
    # TODO (pitr-ch 17-Jan-2019): blocking actor should react to signals?
    # e.g. override sleep to wait for signal with a given timeout?
    # what about other blocking stuff
    # def sleep(time)
    #   raise NotImplementedError
    # end
    #
    # def sleep(time)
    #   raise NotImplementedError
    #   finish = Concurrent.monotonic_time + time
    #   while true
    #     now = Concurrent.monotonic_time
    #     if now < finish
    #       message = @Mailbox.pop_matching(AbstractSignal, finish - now)
    #     else
    #     end
    #   end
    # end
    # TODO (pitr-ch 28-Jan-2019): improve matching support, take inspiration and/or port Algebrick ideas, push ANY and similar further up the namespace

    # The public reference of the actor which can be stored and passed around.
    # Nothing else of the actor should be exposed.
    # {Functions.spawn_actor} and {Environment#spawn} return the pid.
    class Pid < Synchronization::Object
      # TODO (pitr-ch 06-Feb-2019): when actor terminates, release it from memory keeping just pid

      # The actor is asynchronously told a message.
      # The method returns immediately unless
      # the actor has bounded mailbox and there is no more space for the message.
      # Then the method blocks current thread until there is space available.
      # This is useful for backpressure.
      #
      # @param [Object] message
      # @param [Numeric] timeout the maximum time in second to wait
      # @return [self, true, false]
      #   self if timeout was nil, false on timing out and true if told in time.
      def tell(message, timeout = nil)
        @Actor.tell message, timeout
      end

      # Same as {#tell} but represented as a {Promises::Future}.
      # @param [Object] message
      # @return [Promises::Future(self)]
      def tell_op(message)
        @Actor.tell_op(message)
      end

      # The actor is asked the message and blocks until a reply is available,
      # which is returned by the method.
      # If the reply is a rejection then the methods raises it.
      #
      # If the actor does not call {Environment#reply} or {Environment#reply_resolution}
      # the method will raise NoReply error.
      # If the actor is terminated it will raise NoActor.
      # Therefore the ask is never left unanswered and blocking.
      #
      # @param [Object] message
      # @param [Numeric] timeout the maximum time in second to wait
      # @param [Object] timeout_value the value returned on timeout
      # @return [Object, timeout_value] reply to the message
      # @raise [NoReply, NoActor]
      def ask(message, timeout = nil, timeout_value = nil)
        @Actor.ask message, timeout, timeout_value
      end

      # Same as {#tell} but represented as a {Promises::Future}.
      # @param [Object] message
      # @param [Promises::ResolvableFuture] probe
      #   a resolvable future which is resolved with the reply.
      # @return [Promises::Future(Object)] reply to the message
      def ask_op(message, probe = Promises.resolvable_future)
        @Actor.ask_op message, probe
      end

      # @!macro erlang_actor.terminated
      #   @return [Promises::Future] a future which is resolved with
      #     the final result of the actor that is either the reason for
      #     termination or a value if terminated normally.
      def terminated
        @Actor.terminated
      end

      # @return [#to_s, nil] optional name of the actor
      def name
        @Name
      end

      # @return [String] string representation
      def to_s
        original = super
        state    = case terminated.state
                   when :pending
                     'running'
                   when :fulfilled
                     "terminated normally with #{terminated.value}"
                   when :rejected
                     "terminated because of #{terminated.reason}"
                   else
                     raise
                   end
        [original[0..-2], *@Name, state].join(' ') << '>'
      end

      alias_method :inspect, :to_s

      private

      safe_initialization!

      def initialize(actor, name)
        @Actor = actor
        @Name  = name
      end
    end

    # An object representing instance of a monitor, created with {Environment#monitor}.
    class Reference
    end

    # A class providing environment and methods for actor bodies to run in.
    class Environment < Synchronization::Object
      safe_initialization!

      # @!macro erlang_actor.terminated
      def terminated
        @Actor.terminated
      end

      # @return [Pid] the pid of this actor
      def pid
        @Actor.pid
      end

      # @return [#to_s] the name od the actor if provided to spawn method
      def name
        pid.name
      end

      # @return [true, false] does this actor trap exit messages?
      # @see http://www1.erlang.org/doc/man/erlang.html#process_flag-2
      def traps?
        @Actor.traps?
      end

      # When trap is set to true,
      # exit signals arriving to a actor are converted to {Terminated} messages,
      # which can be received as ordinary messages.
      # If trap is set to false,
      # the actor exits
      # if it receives an exit signal other than normal
      # and the exit signal is propagated to its linked actors.
      # Application actors should normally not trap exits.
      #
      # @param [true, false] value
      # @return [true, false] the old value of the flag
      # @see http://www1.erlang.org/doc/man/erlang.html#process_flag-2
      def trap(value = true)
        @Actor.trap(value)
      end

      # Helper for constructing a {#receive} rules
      # @see #receive
      # @example
      #   receive on(Numeric) { |v| v.succ },
      #           on(ANY) { terminate :bad_message }
      def on(matcher, value = nil, &block)
        @Actor.on matcher, value, &block
      end

      # Receive a message.
      #
      # @param [::Array(), ::Array(#===), ::Array<::Array(#===, Proc)>] rules
      #   *   No rule - `receive`, `receive {|m| m.to_s}`
      #   *   or single rule which can be combined with the supplied block -
      #       `receive(Numeric)`, `receive(Numeric) {|v| v.succ}`
      #   *   or array of matcher-proc pairs -
      #       `receive on(Numeric) { |v| v*2 }, on(Symbol) { |c| do_command c }`
      # @param [Numeric] timeout
      #   how long it should wait for the message
      # @param [Object] timeout_value
      #   if rule `on(TIMEOUT) { do_something }` is not specified
      #   then timeout_value is returned.
      # @return [Object, nothing]
      #   depends on type of the actor.
      #   On thread it blocks until message is available
      #   then it returns the message (or a result of a called block).
      #   On pool it stops executing and continues with a given block
      #   when message becomes available.
      # @param [Hash] options
      #   other options specific by type of the actor
      # @option options [true, false] :keep
      #   Keep the rules and repeatedly call the associated blocks,
      #   until receive is called again.
      # @yield [message] block
      #   to process the message
      #   if single matcher is supplied
      # @yieldparam [Object] message the received message
      # @see ErlangActor Receiving chapter in the ErlangActor examples
      def receive(*rules, timeout: nil, timeout_value: nil, **options, &block)
        @Actor.receive(*rules, timeout: timeout, timeout_value: timeout_value, **options, &block)
      end

      # Creates a link between the calling actor and another actor,
      # if there is not such a link already.
      # If a actor attempts to create a link to itself, nothing is done. Returns true.
      #
      # If pid does not exist,
      # the behavior of the method depends on
      # if the calling actor is trapping exits or not (see {#trap}):
      # * If the calling actor is not trapping exits link raises with {NoActor}.
      # * Otherwise, if the calling actor is trapping exits, link returns true,
      #   but an exit signal with reason noproc is sent to the calling actor.
      #
      # @return [true]
      # @raise [NoActor]
      # @see http://www1.erlang.org/doc/man/erlang.html#link-1
      def link(pid)
        @Actor.link(pid)
      end

      # Removes the link, if there is one,
      # between the calling actor and the actor referred to by pid.
      #
      # Returns true and does not fail, even if there is no link to Id, or if Id does not exist.
      #
      # Once unlink(pid) has returned
      # it is guaranteed
      # that the link between the caller and the actor referred to by pid
      # has no effect on the caller in the future (unless the link is setup again).
      # If caller is trapping exits,
      # an {Terminated} message due to the link might have been placed
      # in the caller's message queue prior to the call, though.
      #
      # Note, the {Terminated} message can be the result of the link,
      # but can also be the result of calling #terminate method externally.
      # Therefore, it may be appropriate to cleanup the message queue
      # when trapping exits after the call to unlink, as follow:
      # ```ruby
      # receive on(And[Terminated, -> e { e.pid == pid }], true), timeout: 0
      # ```
      #
      # @return [true]
      def unlink(pid)
        @Actor.unlink(pid)
      end

      # @!visibility private
      # @return [true, false]
      def linked?(pid)
        @Actor.linked? pid
      end

      # The calling actor starts monitoring actor with given pid.
      #
      # A {DownSignal} message will be sent to the monitoring actor
      # if the actor with given pid dies,
      # or if the actor with given pid does not exist.
      #
      # The monitoring is turned off either
      # when the {DownSignal} message is sent, or when {#demonitor} is called.
      #
      # Making several calls to monitor for the same pid is not an error;
      # it results in as many, completely independent, monitorings.
      #
      # @return [Reference]
      def monitor(pid)
        @Actor.monitor(pid)
      end

      # If MonitorRef is a reference which the calling actor obtained by calling {#monitor},
      # this monitoring is turned off.
      # If the monitoring is already turned off, nothing happens.
      #
      # Once demonitor has returned it is guaranteed that no {DownSignal} message
      # due to the monitor will be placed in the caller's message queue in the future.
      # A {DownSignal} message might have been placed in the caller's message queue prior to the call, though.
      # Therefore, in most cases, it is advisable to remove such a 'DOWN' message from the message queue
      # after monitoring has been stopped.
      # `demonitor(reference, :flush)` can be used if this cleanup is wanted.
      #
      # The behavior of this method can be viewed as two combined operations:
      # asynchronously send a "demonitor signal" to the monitored actor and
      # ignore any future results of the monitor.
      #
      # Failure: It is an error if reference refers to a monitoring started by another actor.
      # In that case it may raise an ArgumentError or go unnoticed.
      #
      # Options:
      # *   `:flush` - Remove (one) {DownSignal} message,
      #     if there is one, from the caller's message queue after monitoring has been stopped.
      #     Calling `demonitor(pid, :flush)` is equivalent to the following, but more efficient:
      #     ```ruby
      #     demonitor(pid)
      #     receive on(And[DownSignal, -> d { d.reference == reference}], true), timeout: 0, timeout_value: true
      #     ```
      #
      # *   `info`
      #     The returned value is one of the following:
      #
      #     -   `true` - The monitor was found and removed.
      #         In this case no {DownSignal} message due to this monitor have been
      #         nor will be placed in the message queue of the caller.
      #     -   `false` - The monitor was not found and could not be removed.
      #         This probably because someone already has placed a {DownSignal} message
      #         corresponding to this monitor in the caller's message queue.
      #
      #     If the info option is combined with the flush option,
      #     `false` will be returned if a flush was needed; otherwise, `true`.
      #
      # @param [Reference] reference
      # @param [:flush, :info] options
      # @return [true, false]
      def demonitor(reference, *options)
        @Actor.demonitor(reference, *options)
      end

      # @!visibility private
      def monitoring?(reference)
        @Actor.monitoring? reference
      end

      # Creates an actor.
      #
      # @param [Object] args arguments for the actor body
      # @param [:on_thread, :on_pool] type
      #   of the actor to be created.
      # @param [Channel] channel
      #   The mailbox of the actor, by default it has unlimited capacity.
      #   Crating the actor with a bounded queue is useful to create backpressure.
      #   The channel can be shared with other abstractions
      #   but actor has to be the only consumer
      #   otherwise internal signals could be lost.
      # @param [Environment, Module] environment
      #   A class which is used to run the body of the actor in.
      #   It can either be a child of {Environment} or a module.
      #   Module is extended to a new instance of environment,
      #   therefore if there is many actors with this module
      #   it is better to create a class and use it instead.
      # @param [#to_s] name of the actor.
      #   Available by {Pid#name} or {Environment#name} and part of {Pid#to_s}.
      # @param [true, false] link
      #   the created actor is atomically created and linked with the calling actor
      # @param [true, false] monitor
      #   the created actor is atomically created and monitored by the calling actor
      # @param [ExecutorService] executor
      #   The executor service to use to execute the actor on.
      #   Applies only to :on_pool actor type.
      # @yield [*args] the body of the actor.
      #   When actor is spawned this block is evaluated
      #   until it terminates.
      #   The on-thread actor requires a block.
      #   The on-poll actor has a default `-> { start }`,
      #   therefore if not block is given it executes a #start method
      #   which needs to be provided with environment.
      # @return [Pid, ::Array(Pid, Reference)] a pid or a pid-reference pair when monitor is true
      # @see http://www1.erlang.org/doc/man/erlang.html#spawn-1
      # @see http://www1.erlang.org/doc/man/erlang.html#spawn_link-1
      # @see http://www1.erlang.org/doc/man/erlang.html#spawn_monitor-1
      def spawn(*args,
                type: @Actor.class,
                channel: Promises::Channel.new,
                environment: Environment,
                name: nil,
                executor: default_executor,
                link: false,
                monitor: false,
                &body)

        @Actor.spawn(*args,
                     type:        type,
                     channel:     channel,
                     environment: environment,
                     name:        name,
                     executor:    executor,
                     link:        link,
                     monitor:     monitor,
                     &body)
      end

      # Shortcut for fulfilling the reply, same as `reply_resolution true, value, nil`.
      # @example
      #   actor = Concurrent::ErlangActor.spawn(:on_thread) { reply receive * 2 }
      #   actor.ask 2 #=> 4
      # @param [Object] value
      # @return [true, false] did the sender ask, and was it resolved
      def reply(value)
        # TODO (pitr-ch 08-Feb-2019): consider adding reply? which returns true,false if success, reply method will always return value
        reply_resolution true, value, nil
      end

      # Reply to the sender of the message currently being processed
      # if the actor was asked instead of told.
      # The reply is stored in a {Promises::ResolvableFuture}
      # so the arguments are same as for {Promises::ResolvableFuture#resolve} method.
      #
      # The reply may timeout, then this will fail with false.
      #
      # @param [true, false] fulfilled
      # @param [Object] value
      # @param [Object] reason
      #
      # @example
      #   actor = Concurrent::ErlangActor.spawn(:on_thread) { reply_resolution true, receive * 2, nil }
      #   actor.ask 2 #=> 4
      #
      # @return [true, false] did the sender ask, and was it resolved before it timed out?
      def reply_resolution(fulfilled = true, value = nil, reason = nil)
        @Actor.reply_resolution(fulfilled, value, reason)
      end

      # If pid **is not** provided stops the execution of the calling actor
      # with the exit reason.
      #
      # If pid **is** provided,
      # it sends an exit signal with exit reason to the actor identified by pid.
      #
      # The following behavior apply
      # if `reason` is any object except `:normal` or `:kill`.
      # If pid is not trapping exits,
      # pid itself will exit with exit reason.
      # If pid is trapping exits,
      # the exit signal is transformed into a message {Terminated}
      # and delivered to the message queue of pid.
      #
      # If reason is the Symbol `:normal`, pid will not exit.
      # If it is trapping exits, the exit signal is transformed into a message {Terminated}
      # and delivered to its message queue.
      #
      # If reason is the Symbol `:kill`, that is if `exit(pid, :kill)` is called,
      # an untrappable exit signal is sent to pid which will unconditionally exit
      # with exit reason `:killed`.
      #
      # Since evaluating this function causes the process to terminate, it has no return value.
      #
      # @param [Pid] pid
      # @param [Object, :normal, :kill] reason
      # @param [Object] value
      # @return [nothing]
      # @see http://www1.erlang.org/doc/man/erlang.html#error-1
      # @see http://www1.erlang.org/doc/man/erlang.html#error-2
      def terminate(pid = nil, reason, value: nil)
        @Actor.terminate pid, reason, value: value
      end

      # @return [ExecutorService] a default executor which is picked by spawn call
      def default_executor
        @DefaultExecutor
      end

      private

      def initialize(actor, executor)
        super()
        @Actor           = actor
        @DefaultExecutor = executor
      end
    end

    # A module containing entry functions to actors like spawn_actor, terminate_actor.
    # It can be included in environments working with actors.
    # @example
    #   include Concurrent::ErlangActors::Functions
    #   actor = spawn_actor :on_pool do
    #     receive { |data| process data }
    #   end
    # @see FunctionShortcuts
    module Functions
      # Creates an actor. Same as {Environment#spawn} but lacks link and monitor options.
      # @param [Object] args
      # @param [:on_thread, :on_pool] type
      # @param [Channel] channel
      # @param [Environment, Module] environment
      # @param [#to_s] name of the actor
      # @param [ExecutorService] executor of the actor
      # @return [Pid]
      # @see Environment#spawn
      def spawn_actor(*args,
                      type:,
                      channel: Promises::Channel.new,
                      environment: Environment,
                      name: nil,
                      executor: default_actor_executor,
                      &body)

        actor = ErlangActor.create type, channel, environment, name, executor
        actor.run(*args, &body)
        return actor.pid
      end

      # Same as {Environment#terminate}, but it requires pid.
      # @param [Pid] pid
      # @param [Object, :normal, :kill] reason
      # @return [true]
      def terminate_actor(pid, reason)
        if reason == :kill
          pid.tell Kill.new(nil)
        else
          pid.tell Terminate.new(nil, reason, false)
        end
        true
      end

      # @return [ExecutorService] the default executor service for actors
      def default_actor_executor
        default_executor
      end

      # @return [ExecutorService] the default executor service,
      #   may be shared by other abstractions
      def default_executor
        :io
      end
    end

    # Constrains shortcuts for methods in {Functions}.
    module FunctionShortcuts
      # Optionally included shortcut method for {Functions#spawn_actor}
      # @return [Pid]
      def spawn(*args, **kwargs, &body)
        spawn_actor(*args, **kwargs, &body)
      end

      # Optionally included shortcut method for {Functions#terminate_actor}
      # @return [true]
      def terminate(pid, reason)
        terminate_actor(pid, reason)
      end
    end

    extend Functions
    extend FunctionShortcuts
    extend Concern::Logging

    class Token
      def initialize(name)
        @name = name
      end

      def to_s
        @name
      end

      alias_method :inspect, :to_s
    end

    private_constant :Token

    JUMP      = Token.new 'JUMP'
    TERMINATE = Token.new 'TERMINATE'
    RECEIVE   = Token.new 'RECEIVE'
    NOTHING   = Token.new 'NOTHING'

    private_constant :JUMP
    private_constant :TERMINATE
    private_constant :RECEIVE
    private_constant :NOTHING

    # These constants are useful
    # where the body of an actor is defined.
    # For convenience they are provided in this module for including.
    # @example
    #   include Concurrent::ErlangActor::EnvironmentConstants
    #   actor = Concurrent::ErlangActor.spawn(:on_thread) do
    #     receive on(Numeric) { |v| v.succ },
    #             on(ANY) { terminate :bad_message },
    #             on(TIMEOUT) { terminate :no_message },
    #             timeout: 1
    #   end
    module EnvironmentConstants
      # Unique identifier of a timeout, singleton.
      TIMEOUT = Token.new 'TIMEOUT'
      # A singleton which matches anything using #=== method
      ANY = Promises::Channel::ANY

      class AbstractLogicOperationMatcher
        def self.[](*matchers)
          new(*matchers)
        end

        def initialize(*matchers)
          @matchers = matchers
        end
      end

      # Combines matchers into one which matches if all match.
      # @example
      #   And[Numeric, -> v { v >= 0 }] === 1  # => true
      #   And[Numeric, -> v { v >= 0 }] === -1 # => false
      class And < AbstractLogicOperationMatcher
        # @return [true, false]
        def ===(v)
          @matchers.all? { |m| m === v }
        end
      end

      # Combines matchers into one which matches if any matches.
      # @example
      #   Or[Symbol, String] === :v  # => true
      #   Or[Symbol, String] === 'v' # => true
      #   Or[Symbol, String] === 1   # => false
      class Or < AbstractLogicOperationMatcher
        # @return [true, false]
        def ===(v)
          @matchers.any? { |m| m === v }
        end
      end
    end

    include EnvironmentConstants

    class Run
      attr_reader :future

      def self.[](future)
        new future
      end

      def initialize(future)
        @future = future
      end

      TEST = -> v { v.future if v.is_a?(Run) }
    end
    private_constant :Run

    class AbstractActor < Synchronization::Object

      include EnvironmentConstants
      include Concern::Logging
      safe_initialization!

      # @param [Promises::Channel] mailbox
      def initialize(mailbox, environment, name, executor)
        super()
        @Mailbox                = mailbox
        @Pid                    = Pid.new self, name
        @Linked                 = ::Set.new
        @Monitors               = {}
        @Monitoring             = {}
        @MonitoringLateDelivery = {}
        @Terminated             = Promises.resolvable_future
        @trap                   = false
        @reply                  = nil

        @Environment = if environment.is_a?(Class) && environment <= Environment
                         environment.new self, executor
                       elsif environment.is_a? Module
                         e = Environment.new self, executor
                         e.extend environment
                         e
                       else
                         raise ArgumentError,
                               "environment has to be a class inheriting from Environment or a module"
                       end
      end

      def tell_op(message)
        log DEBUG, @Pid, told: message
        if (mailbox = @Mailbox)
          mailbox.push_op(message).then { @Pid }
        else
          Promises.fulfilled_future @Pid
        end
      end

      def tell(message, timeout = nil)
        log DEBUG, @Pid, told: message
        if (mailbox = @Mailbox)
          timed_out = mailbox.push message, timeout
          timeout ? timed_out : @Pid
        else
          timeout ? false : @Pid
        end
      end

      def ask(message, timeout, timeout_value)
        log DEBUG, @Pid, asked: message
        if @Terminated.resolved?
          raise NoActor.new(@Pid)
        else
          probe    = Promises.resolvable_future
          question = Ask.new(message, probe)
          if timeout
            start   = Concurrent.monotonic_time
            in_time = tell question, timeout
            # recheck it could have in the meantime terminated and drained mailbox
            raise NoActor.new(@Pid) if @Terminated.resolved?
            # has to be after resolved check, to catch case where it would return timeout_value
            # when it was actually terminated
            to_wait = if in_time
                        time = timeout - (Concurrent.monotonic_time - start)
                        time >= 0 ? time : 0
                      else
                        0
                      end
            # TODO (pitr-ch 06-Feb-2019): allow negative timeout everywhere, interpret as 0
            probe.value! to_wait, timeout_value, [true, nil, nil]
          else
            raise NoActor.new(@Pid) if @Terminated.resolved?
            tell question
            probe.reject NoActor.new(@Pid), false if @Terminated.resolved?
            probe.value!
          end
        end
      end

      def ask_op(message, probe)
        log DEBUG, @Pid, asked: message
        if @Terminated.resolved?
          probe.reject NoActor.new(@Pid), false
        else
          tell_op(Ask.new(message, probe)).then do
            probe.reject NoActor.new(@Pid), false if @Terminated.resolved?
            probe
          end.flat
        end
      end

      def terminated
        @Terminated.with_hidden_resolvable
      end

      def pid
        @Pid
      end

      def traps?
        @trap
      end

      def trap(value = true)
        old = @trap
        # noinspection RubySimplifyBooleanInspection
        @trap = !!value
        old
      end

      def on(matcher, value = nil, &block)
        raise ArgumentError, 'only one of block or value can be supplied' if block && value
        [matcher, value || block]
      end

      def receive(*rules, timeout: nil, timeout_value: nil, **options, &block)
        raise NotImplementedError
      end

      def link(pid)
        return true if pid == @Pid
        if @Linked.add? pid
          pid.tell Link.new(@Pid)
          if pid.terminated.resolved?
            # no race since it only could get NoActor
            if @trap
              tell Terminate.new pid, NoActor.new(pid)
            else
              @Linked.delete pid
              raise NoActor.new(pid)
            end
          end
        end
        true
      end

      def unlink(pid)
        pid.tell UnLink.new(@Pid) if @Linked.delete pid
        true
      end

      def linked?(pid)
        @Linked.include? pid
      end

      def monitor(pid)
        # *monitoring*    *monitored*
        # send Monitor
        # terminated?
        #                 terminate before getting Monitor
        #                 drain signals including the Monitor
        reference              = Reference.new
        @Monitoring[reference] = pid
        if pid.terminated.resolved?
          # always return no-proc when terminated
          tell DownSignal.new(pid, reference, NoActor.new(pid))
        else
          # otherwise let it race
          pid.tell Monitor.new(@Pid, reference)
          # no race, it cannot get anything else than NoActor
          tell DownSignal.new(pid, reference, NoActor.new(pid)) if pid.terminated.resolved?
        end
        reference
      end

      def demonitor(reference, *options)
        info  = options.delete :info
        flush = options.delete :flush
        raise ArgumentError, "bad options #{options}" unless options.empty?

        pid          = @Monitoring.delete reference
        demonitoring = !!pid
        pid.tell DeMonitor.new @Pid, reference if demonitoring

        if flush
          # remove (one) down message having reference from mailbox
          flushed = demonitoring ? !!@Mailbox.try_pop_matching(And[DownSignal, -> m { m.reference == reference }]) : false
          return info ? !flushed : true
        end

        if info
          return false unless demonitoring

          if @Mailbox.peek_matching(And[DownSignal, -> m { m.reference == reference }])
            @MonitoringLateDelivery[reference] = pid # allow to deliver the message once
            return false
          end
        end

        return true
      end

      def monitoring?(reference)
        @Monitoring.include? reference
      end

      def spawn(*args,
                type:,
                channel:,
                environment:,
                name:,
                link:,
                monitor:,
                executor:,
                &body)
        actor = ErlangActor.create type, channel, environment, name, executor
        pid   = actor.pid
        link pid if link
        ref = (monitor pid if monitor)
        actor.run(*args, &body)
        monitor ? [pid, ref] : pid
      end

      def reply_resolution(fulfilled, value, reason)
        return false unless @reply
        return !!@reply.resolve(fulfilled, value, reason, false)
      end

      def terminate(pid = nil, reason, value: nil)
        if pid
          # has to send it to itself even if pid equals self.pid
          if reason == :kill
            pid.tell Kill.new(@Pid)
          else
            pid.tell Terminate.new(@Pid, reason, false)
          end
        else
          terminate_self(reason, value)
        end
      end

      private

      def canonical_rules(rules, timeout, timeout_value, given_block)
        block = given_block || -> v { v }
        case rules.size
        when 0
          rules.push(on(ANY, &block))
        when 1
          matcher = rules.first
          if matcher.is_a?(::Array) && matcher.size == 2
            return ArgumentError.new 'a block cannot be given if full rules are used' if given_block
          else
            rules.replace([on(matcher, &block)])
          end
        else
          return ArgumentError.new 'a block cannot be given if full rules are used' if given_block
        end

        if timeout
          # TIMEOUT rule has to be first, to prevent any picking it up ANY
          has_timeout = nil
          i           = rules.size
          rules.reverse_each do |r, _|
            i -= 1
            if r == TIMEOUT
              has_timeout = i
              break
            end
          end

          rules.unshift(has_timeout ? rules[has_timeout] : on(TIMEOUT, timeout_value))
        end
        nil
      end

      def eval_task(message, job)
        if job.is_a? Proc
          @Environment.instance_exec message, &job
        else
          job
        end
      end

      def send_exit_messages(reason)
        @Linked.each do |pid|
          pid.tell Terminate.new(@Pid, reason)
        end.clear
        @Monitors.each do |reference, pid|
          pid.tell DownSignal.new(@Pid, reference, reason)
        end.clear
      end

      def asked?
        !!@reply
      end

      def clean_reply(reason = NoReply)
        if @reply
          @reply.reject(reason, false)
          @reply = nil
        end
      end

      def consume_signal(message)
        if AbstractSignal === message
          case message
          when Ask
            @reply = message.probe
            message.message
          when Link
            @Linked.add message.from
            NOTHING
          when UnLink
            @Linked.delete message.from
            NOTHING
          when Monitor
            @Monitors[message.reference] = message.from
            NOTHING
          when DeMonitor
            @Monitors.delete message.reference
            NOTHING
          when Kill
            terminate :killed
          when DownSignal
            if @Monitoring.delete(message.reference) || @MonitoringLateDelivery.delete(message.reference)
              # put into a queue
              return Down.new(message.from, message.reference, message.info)
            end

            # ignore down message if no longer monitoring, and following case
            #
            # *monitoring*    *monitored*
            # send Monitor
            #                 terminate
            # terminated?
            #                 drain signals # generates second DOWN which is dropped here
            # already reported as :noproc
            NOTHING
          when Terminate
            consume_exit message
          else
            raise "unknown message #{message}"
          end
        else
          # regular message
          message
        end
      end

      def consume_exit(exit_message)
        from, reason = exit_message
        if !exit_message.link_terminated || @Linked.delete(from)
          case reason
          when :normal
            if @trap
              Terminated.new from, reason
            else
              if from == @Pid
                terminate :normal
              else
                NOTHING # do nothing
              end
            end
          else
            if @trap
              Terminated.new from, reason
            else
              terminate reason
            end
          end
        else
          # *link*          *exiting*
          # send Link
          #                 terminate
          # terminated?
          #                 drain signals # generates second Terminated which is dropped here
          # already processed exit message, do nothing
          NOTHING
        end
      end

      def initial_signal_consumption
        while true
          message = @Mailbox.try_pop
          break unless message
          consume_signal(message) == NOTHING or raise 'it was not consumable signal'
        end
      end

      def terminate_self(reason, value)
        raise NotImplementedError
      end

      def after_termination(final_reason)
        log DEBUG, @Pid, terminated: final_reason
        clean_reply NoActor.new(@Pid)
        while true
          message = @Mailbox.try_pop NOTHING
          break if message == NOTHING
          case message
          when Monitor
            # The actor is terminated so we must return NoActor,
            # even though we still know the reason.
            # Otherwise it would return different reasons non-deterministically.
            message.from.tell DownSignal.new(@Pid, message.reference, NoActor.new(@Pid))
          when Link
            # same as for Monitor
            message.from.tell NoActor.new(@Pid)
          when Ask
            message.probe.reject(NoActor.new(@Pid), false)
          else
            # normal messages and other signals are thrown away
          end
        end
        @Mailbox = nil
      end
    end

    private_constant :AbstractActor

    class OnPool < AbstractActor

      def initialize(channel, environment, name, executor)
        super channel, environment, name, executor
        @Executor       = executor
        @behaviour      = []
        @keep_behaviour = false
      end

      def run(*args, &body)
        body ||= -> { start }

        initial_signal_consumption
        inner_run(*args, &body).
            run(Run::TEST).
            then(&method(:after_termination)).
            rescue { |e| log ERROR, e }
      end

      def receive(*rules, timeout: nil, timeout_value: nil, keep: false, &given_block)
        clean_reply
        err = canonical_rules rules, timeout, timeout_value, given_block
        raise err if err

        @keep_behaviour = keep
        @timeout        = timeout
        @behaviour      = rules
        throw JUMP, [RECEIVE]
      end

      private

      def terminate_self(reason, value)
        throw JUMP, [TERMINATE, reason, value]
      end

      def inner_run(*args, &body)
        first       = !!body
        future_body = -> message, _actor do
          kind, reason, value =
              if message.is_a?(::Array) && message.first == TERMINATE
                message
              else
                begin
                  catch(JUMP) do
                    [NOTHING,
                     :normal,
                     first ? @Environment.instance_exec(*args, &body) : apply_behaviour(message)]
                  end
                rescue => e
                  [TERMINATE, e, nil]
                end
              end

          case kind
          when TERMINATE
            send_exit_messages reason
            @Terminated.resolve(reason == :normal, value, reason)
            reason
          when RECEIVE
            Run[inner_run]
          when NOTHING
            if @behaviour.empty?
              send_exit_messages reason
              @Terminated.resolve(reason == :normal, value, reason)
              reason
            else
              Run[inner_run]
            end
          else
            raise "bad kind: #{kind.inspect}"
          end
        end

        if first
          Promises.future_on(@Executor, nil, self, &future_body)
        else
          internal_receive.run(Run::TEST).then(self, &future_body)
        end
      end

      def internal_receive
        raise if @behaviour.empty?
        rules_matcher  = Or[*@behaviour.map(&:first)]
        matcher        = -> m { m.is_a?(Ask) ? rules_matcher === m.message : rules_matcher === m }
        start          = nil
        message_future = case @timeout
                         when 0
                           Promises.fulfilled_future @Mailbox.try_pop_matching(matcher, TIMEOUT)
                         when Numeric
                           pop   = @Mailbox.pop_op_matching(matcher)
                           start = Concurrent.monotonic_time
                           # FIXME (pitr-ch 30-Jan-2019): the scheduled future should be cancelled
                           (Promises.schedule(@timeout) { TIMEOUT } | pop).then(pop) do |message, p|
                             if message == TIMEOUT && !p.resolve(true, TIMEOUT, nil, false)
                               # timeout raced with probe resolution, take the value instead
                               p.value
                             else
                               message
                             end
                           end
                         when nil
                           @Mailbox.pop_op_matching(matcher)
                         else
                           raise
                         end

        message_future.then(start, self) do |message, s, _actor|
          log DEBUG, pid, got: message
          catch(JUMP) do
            if (message = consume_signal(message)) == NOTHING
              @timeout = [@timeout + s - Concurrent.monotonic_time, 0].max if s
              Run[internal_receive]
            else
              message
            end
          end
        end
      end

      def apply_behaviour(message)
        @behaviour.each do |rule, job|
          if rule === message
            @behaviour = [] unless @keep_behaviour
            return eval_task(message, job)
          end
        end
        raise 'should not reach'
      end
    end

    private_constant :OnPool

    class OnThread < AbstractActor
      def initialize(channel, environment, name, executor)
        super channel, environment, name, executor
        @Thread = nil
      end

      TERMINATE = Module.new
      private_constant :TERMINATE

      def run(*args, &body)
        initial_signal_consumption
        @Thread = Thread.new(@Terminated, self) do |terminated, _actor| # sync point
          Thread.abort_on_exception = true

          final_reason = begin
            reason, value = catch(TERMINATE) do
              [:normal, @Environment.instance_exec(*args, &body)]
            end
            send_exit_messages reason
            terminated.resolve(reason == :normal, value, reason)
            reason
          rescue => e
            send_exit_messages e
            terminated.reject e
            e
          end

          after_termination final_reason
          @Thread = nil
        end
      end

      def receive(*rules, timeout: nil, timeout_value: nil, &given_block)
        clean_reply

        err = canonical_rules rules, timeout, timeout_value, given_block
        raise err if err

        rules_matcher = Or[*rules.map(&:first)]
        matcher       = -> m { m.is_a?(Ask) ? rules_matcher === m.message : rules_matcher === m }
        while true
          message = @Mailbox.pop_matching(matcher, timeout, TIMEOUT)
          log DEBUG, pid, got: message
          unless (message = consume_signal(message)) == NOTHING
            rules.each do |rule, job|
              return eval_task(message, job) if rule === message
            end
          end
        end
      end

      private

      def terminate_self(reason, value)
        throw TERMINATE, [reason, value]
      end
    end

    private_constant :OnThread

    class AbstractSignal < Synchronization::Object
      safe_initialization!
    end

    private_constant :AbstractSignal

    class Ask < AbstractSignal
      attr_reader :message, :probe

      def initialize(message, probe)
        super()
        @message = message
        @probe   = probe
        raise ArgumentError, 'probe is not Resolvable' unless probe.is_a? Promises::Resolvable
      end
    end

    private_constant :Ask

    module HasFrom

      # @return [Pid]
      attr_reader :from

      # @!visibility private
      def initialize(from)
        # noinspection RubySuperCallWithoutSuperclassInspection
        super()
        @from = from
      end

      # @return [true, false]
      def ==(o)
        o.class == self.class && o.from == @from
      end

      alias_method :eql?, :==

      # @return [Integer]
      def hash
        @from.hash
      end
    end

    private_constant :HasFrom

    module HasReason
      include HasFrom

      # @return [Object]
      attr_reader :reason

      # @!visibility private
      def initialize(from, reason)
        # noinspection RubySuperCallWithoutSuperclassInspection
        super from
        @reason = reason
      end

      # @return [::Array(Pid, Object)]
      def to_ary
        [@from, @reason]
      end

      # @return [true, false]
      def ==(o)
        # noinspection RubySuperCallWithoutSuperclassInspection
        super(o) && o.reason == self.reason
      end

      # @return [Integer]
      def hash
        [@from, @reason].hash
      end
    end

    private_constant :HasReason

    module HasReference
      include HasFrom

      # @return [Reference]
      attr_reader :reference

      # @!visibility private
      def initialize(from, reference)
        # noinspection RubySuperCallWithoutSuperclassInspection
        super from
        @reference = reference
      end

      # @return [::Array(Pid, Reference)]
      def to_ary
        [@from, @reference]
      end

      # @return [true, false]
      def ==(o)
        # noinspection RubySuperCallWithoutSuperclassInspection
        super(o) && o.reference == self.reference
      end

      # @return [Integer]
      def hash
        [@from, @reference].hash
      end
    end

    private_constant :HasReference

    class Terminate < AbstractSignal
      include HasReason

      attr_reader :link_terminated

      def initialize(from, reason, link_terminated = true)
        super from, reason
        @link_terminated = link_terminated
      end
    end

    private_constant :Terminate

    class Kill < AbstractSignal
      include HasFrom
    end

    private_constant :Kill

    class Link < AbstractSignal
      include HasFrom
    end

    private_constant :Link

    class UnLink < AbstractSignal
      include HasFrom
    end

    private_constant :UnLink

    class Monitor < AbstractSignal
      include HasReference
    end

    private_constant :Monitor

    class DeMonitor < AbstractSignal
      include HasReference
    end

    private_constant :DeMonitor

    # A message send when actor terminates.
    class Terminated
      # @return [Pid]
      attr_reader :from
      # @return [Object]
      attr_reader :reason

      # @!visibility private
      def initialize(from, reason)
        # noinspection RubySuperCallWithoutSuperclassInspection
        @from   = from
        @reason = reason
      end

      # @return [::Array(Pid, Object)]
      def to_ary
        [@from, @reason]
      end

      # @return [true, false]
      def ==(o)
        o.class == self.class && o.from == @from && o.reason == self.reason
      end

      alias_method :eql?, :==

      # @return [Integer]
      def hash
        [@from, @reason].hash
      end
    end

    class DownSignal < AbstractSignal
      include HasReference

      # @return [Object]
      attr_reader :info

      # @!visibility private
      def initialize(from, reference, info)
        super from, reference
        @info = info
      end

      # @return [::Array(Pis, Reference, Object)]
      def to_ary
        [@from, @reference, @info]
      end

      # @return [true, false]
      def ==(o)
        super(o) && o.info == self.info
      end

      # @return [Integer]
      def hash
        to_ary.hash
      end
    end

    private_constant :DownSignal

    # A message send by a monitored actor when terminated.
    class Down
      # @return [Pid]
      attr_reader :from
      # @return [Reference]
      attr_reader :reference
      # @return [Object]
      attr_reader :info

      # @!visibility private
      def initialize(from, reference, info)
        @from      = from
        @reference = reference
        @info      = info
      end

      # @return [::Array(Pis, Reference, Object)]
      def to_ary
        [@from, @reference, @info]
      end

      # @return [true, false]
      def ==(o)
        o.class == self.class && o.from == @from && o.reference == @reference && o.info == @info
      end

      alias_method :eql?, :==

      # @return [Integer]
      def hash
        to_ary.hash
      end
    end

    # Abstract error class for ErlangActor errors.
    class Error < Concurrent::Error
    end

    # An error used when actor tries to link or monitor terminated actor.
    class NoActor < Error
      # @return [Pid]
      attr_reader :pid

      # @param [Pid] pid
      # @return [self]
      def initialize(pid = nil)
        super(pid.to_s)
        @pid = pid
      end

      # @return [true, false]
      def ==(o)
        o.class == self.class && o.pid == self.pid
      end

      alias_method :eql?, :==

      # @return [Integer]
      def hash
        pid.hash
      end
    end

    # An error used when actor is asked but no reply was given or
    # when the actor terminates before it gives a reply.
    class NoReply < Error
    end

    # @!visibility private
    def self.create(type, channel, environment, name, executor)
      actor = KLASS_MAP.fetch(type).new(channel, environment, name, executor)
    ensure
      log Concern::Logging::DEBUG, actor.pid, created: caller[1] if actor
    end

    KLASS_MAP = {
        on_thread: OnThread,
        on_pool:   OnPool,
        OnThread   => OnThread,
        OnPool     => OnPool,
    }
    private_constant :KLASS_MAP
  end
end