symplely/coroutine

View on GitHub
Coroutine/Kernel.php

Summary

Maintainability
F
1 wk
Test Coverage
<?php

declare(strict_types=1);

namespace Async;

use Async\Channel;
use Async\CoroutineInterface;
use Async\TaskInterface;
use Async\LengthException;
use Async\InvalidStateError;
use Async\InvalidArgumentException;
use Async\TaskTimeout;
use Async\TimeoutError;
use Async\CancelledError;
use Async\Panic;
use Async\FiberInterface;
use Async\Di\InjectionInterface;
use Async\Misc\AsyncIterator;
use Async\Misc\Contextify;
use Async\Misc\ContextInterface;
use Async\Misc\Semaphore;
use Async\Misc\TimeoutAfter;
use Async\Threads\Thread;
use Async\Threads\TWorker;
use Async\Spawn\Channeled;
use Async\Spawn\FutureInterface;
use Psr\Container\ContainerInterface;

use function Async\Worker\awaitable_future;

/**
 * The Kernel
 * This class is used for Communication between the tasks and the scheduler
 *
 * The `yield` keyword in your code, act both as an interrupt and as a way to
 * pass information to (and from) the scheduler.
 */
final class Kernel
{
  protected $callback;
  protected static $gatherCount = 0;
  protected static $gatherShouldError = true;
  protected static $gatherShouldClearCancelled = false;

  /**
   * Custom `Gather` not started state.
   * @var string
   */
  protected static $isCustomSate = 'n/a';

  /**
   * Execute on already pre-completed `Gather` tasks.
   * @var callable
   */
  protected static $onPreComplete;

  /**
   * Execute on completed `Gather` tasks.
   * @var callable
   */
  protected static $onCompleted;

  /**
   * Execute on exception `Gather` tasks.
   * @var callable
   */
  protected static $onError;

  /**
   * Execute on cancelled `Gather` tasks.
   * @var callable
   */
  protected static $onCancel;

  /**
   * Execute on not started `Gather` tasks.
   * @var callable
   */
  protected static $onProcessing;

  /**
   * Execute cleanup on `GatherWait()` race tasks no longer needed.
   * @var callable
   */
  protected static $onClear;

  public function __construct(callable $callback)
  {
    $this->callback = $callback;
  }

  /**
   * Tells the scheduler to pass the calling `task` or `fiber`, and itself into the function.
   *
   * @param TaskInterface|FiberInterface $taskFiber
   * @param CoroutineInterface $coroutine
   * @return mixed
   */
  public function __invoke($taskFiber, CoroutineInterface $coroutine)
  {
    if ($taskFiber instanceof TaskInterface || $taskFiber instanceof FiberInterface) {
      $callback = $this->callback;
      return $callback($taskFiber, $coroutine);
    }

    // @codeCoverageIgnoreStart
    \panic('Must be instance of "Async\TaskInterface" or "Async\FiberInterface"');
    // @codeCoverageIgnoreEnd
  }

  /**
   * Returns the current context task ID
   *
   * @return int task id instance
   */
  public static function currentTask()
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) {
        $task->sendValue($task->taskId());
        $coroutine->schedule($task);
      }
    );
  }

  /**
   * Set current Task context type, currently either `paralleled`, `async`, `async_method`, `threaded`, `awaited`,
   * `stateless`, or `monitored`.
   * Will return the current task ID.
   *
   * - This function needs to be prefixed with `yield`
   *
   * @param string $context
   * @return int
   */
  public static function taskType(string $context = 'async')
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($context) {
        $task->taskType($context);
        $task->sendValue($task->taskId());
        $coroutine->schedule($task);
      }
    );
  }

  /**
   * Creates a new task (using the next free task id), wraps **Generator**, a `coroutine` into a `Task` and schedule its execution.
   * Returns the `Task` object/id.
   *
   * @see https://docs.python.org/3.10/library/asyncio-task.html#creating-tasks
   * @source https://github.com/python/cpython/blob/11909c12c75a7f377460561abc97707a4006fc07/Lib/asyncio/tasks.py#L331
   *
   * @param \Generator $coroutines
   * @param bool $isAsync should task type be set to a `async` function
   *
   * @return int task ID
   */
  public static function createTask(\Generator $coroutines, bool $isAsync = false)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($coroutines, $isAsync) {
        $task->sendValue($coroutine->createTask($coroutines, $isAsync));
        $coroutine->schedule($task);
      }
    );
  }

  /**
   * Creates an Channel similar to Google's Go language
   *
   * @return object
   */
  public static function make()
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) {
        $task->sendValue(Channel::make($task, $coroutine));
        $coroutine->schedule($task);
      }
    );
  }

  /**
   * Set Channel by caller's task, similar to Google Go language
   *
   * @param Channel $channel
   */
  public static function receiver(Channel $channel)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($channel) {
        $channel->receiver($task);
        $coroutine->schedule($task);
      }
    );
  }

  /**
   * Wait to receive message, similar to Google Go language
   */
  public static function receive(Channel $channel)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($channel) {
        $channel->receive();
      }
    );
  }

  /**
   * Send an message to Channel by task id, similar to Google Go language
   *
   * @param mixed $message
   * @param int $taskId
   */
  public static function sender(Channel $channel, $message = null, int $taskId = 0)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($channel, $message, $taskId) {
        $target = $channel->receiverTask();
        $sender = $channel->senderTask();
        $targetTask = $target instanceof TaskInterface
          ? $target
          : $sender;

        $checkTask = $coroutine->getTask($taskId);
        if ($checkTask instanceof TaskInterface && $taskId > 0) {
          $targetTask = $checkTask;
        }

        $targetTask->sendValue($message);
        $coroutine->schedule($targetTask);
        $coroutine->schedule($task);
      }
    );
  }

  /**
   * Cancel a task by **throwing** a `CancelledError` exception, this will also delay kill/remove
   * the task, the status of such can be checked with `is_cancelled` and `is_cancelling` functions.
   * Optionally pass custom cancel state and error message for third party code integration.
   *
   * @see https://docs.python.org/3.10/library/asyncio-task.html#asyncio.Task.cancel
   * @source https://github.com/python/cpython/blob/bb0b5c12419b8fa657c96185d62212aea975f500/Lib/asyncio/tasks.py#L181
   *
   * @param int $tid task id instance
   * @param mixed $customState
   * @param string $errorMessage
   * @return bool
   *
   * @throws \InvalidArgumentException
   */
  public static function cancelTask($tid = 0, $customState = null, string $errorMessage = 'Invalid task ID!', bool $type = false)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($tid, $customState, $errorMessage, $type) {
        $cancelTask = $coroutine->getTask($tid);
        if (!\is_null($cancelTask)) {
          $coroutine->clearTimeout($cancelTask);
          if (!empty($customState))
            $cancelTask->customState($customState);

          $isContext = $task->hasWith() || $cancelTask->hasGroup();
          if ($isContext)
            $coroutine->createTask(\delayer(0, [$coroutine, 'cancelTask'], $tid, $customState, $errorMessage));

          if ($cancelTask->hasCaller()) {
            $unjoined = $cancelTask->getCaller();
            $cancelTask->setCaller();
            $cid = $cancelTask->taskId();
            $unjoined->setException(new CancelledError("Task {$cid}!"));
            $coroutine->schedule($unjoined);
          }

          $error = ($isContext || $type) ? new TaskCancelled("Task {$tid}!") : new CancelledError("Task {$tid}!");
          $customData = $cancelTask->getCustomData();
          if ($customData instanceof FutureInterface || $customData instanceof Thread || $customData instanceof TWorker) {
            $task->sendValue(true);
            $cancelTask->setCaller($task);
            if ($customData instanceof FutureInterface) {
              $customData->stop();
              $cancelTask->setException($error);
            } else
              $customData->cancel($tid);

            return $coroutine->schedule($task);
          }

          $cancelTask->setException($error);
          if ($cancelTask instanceof FiberInterface)
            $coroutine->scheduleFiber($cancelTask);
          else
            $coroutine->schedule($cancelTask);

          if ($task->taskId() === $cancelTask->taskId()) {
            $task->taskType('cancellation');
            $task->setException($error);
          } else {
            $task->sendValue(true);
          }

          $coroutine->schedule($task);
        } elseif ($coroutine->isCompleted($tid)) {
          $task->sendValue(true);
          $coroutine->schedule($task);
        } else {
          throw new InvalidArgumentException($errorMessage . ' ' . $tid);
        }
      }
    );
  }

  /**
   *     Wait for the task to terminate and return its result.
   * - This function needs to be prefixed with `yield`
   *
   * @param integer $tid task id instance
   * @return mixed
   */
  public static function joinTask(int $tid)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($tid) {
        $join = $coroutine->getTask($tid);
        if (!\is_null($join)) {
          $join->setCaller($task);
          if ($join->hasGroup())
            $join->discardGroup();

          if ($join->exception()) {
            $task->setException($join->exception());
            return $coroutine->schedule($task);
          }

          $customData = $join->getCustomData();
          if ($customData instanceof Thread || $customData instanceof TWorker) {
            $coroutine->schedule($join);
            return $customData->join($tid);
          }

          $coroutine->schedule($join);
          return $join->join();
        } else {
          $coroutine->schedule($task);
        }
      }
    );
  }

  /**
   * Performs a clean application shutdown, killing tasks/processes, and resetting all data, except **created** `async` functions.
   *
   * Provide $skipTask incase called by an Signal Handler.
   *
   * @param int $skipTask - Defaults to the main parent task.
   * - The calling `$skipTask` task id will not get cancelled, the script execution will return to.
   * - Use `currentTask()` to retrieve caller's task id.
   */
  public static function shutdown(int $skipTask = 1)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($skipTask) {
        if ($skipTask === 1)
          $skipTask = Co::getUnique('parent');

        $returnTask = $coroutine->getTask($skipTask);
        $coroutine->shutdown($skipTask);
        if ($returnTask instanceof TaskInterface) {
          $coroutine->schedule($returnTask);
        }
      }
    );
  }

  /**
   * Wait on read stream/socket to be ready read from,
   * optionally schedule current task to execute immediately/next for third party code integration.
   *
   * @param resource $streamSocket
   * @param bool $immediately
   */
  public static function readWait($streamSocket, bool $immediately = false)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($streamSocket, $immediately) {
        $coroutine->addReader($streamSocket, $task, false);
        if ($immediately) {
          $coroutine->schedule($task);
        }
      }
    );
  }

  /**
   * Wait on write stream/socket to be ready to be written to,
   * optionally schedule current task to execute immediately/next for third party code integration.
   *
   * @param resource $streamSocket
   * @param bool $immediately
   */
  public static function writeWait($streamSocket, bool $immediately = false)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($streamSocket, $immediately) {
        $coroutine->addWriter($streamSocket, $task, false);
        if ($immediately) {
          $coroutine->schedule($task);
        }
      }
    );
  }

  /**
   * Add and wait for result of an blocking `I/O` `future` that runs in parallel.
   * This function turns the calling function internal state/type used by `gather()`
   * to **process/paralleled** which is handled differently.
   *
   * - This function needs to be prefixed with `yield`
   *
   * @see https://docs.python.org/3.7/library/asyncio-subprocess.html#subprocesses
   * @see https://docs.python.org/3.7/library/asyncio-dev.html#running-blocking-code
   *
   * @param callable|shell $command
   * @param int|float|null $timeout The timeout in seconds or null to disable
   * @param bool $display set to show `future` output
   * @param Channeled|resource|mixed|null $channel IPC/CSP communication to be passed to the underlying `Future` instance.
   * @param int|null $channelTask The task id to use for realtime **future** output interaction.
   * @param int $signal
   * @param int $signalTask The task to call when `future` is terminated with a signal.
   *
   * @return mixed
   */
  public static function addFuture(
    $command,
    $timeout = 0,
    bool $display = false,
    $channel = null,
    $channelTask = null,
    int $signal = \SIGKILL,
    $signalTask = null,
    $taskType = null
  ) {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine)
      use ($command, $timeout, $display, $channel, $channelTask, $signal, $signalTask, $taskType) {
        $task->taskType('paralleled');
        $task->setState('process');
        $task->customState($taskType);
        $future = $coroutine->addFuture($command, $timeout, $display, $channel)
          ->then(function ($result) use ($task, $coroutine) {
            $coroutine->cancelProgress($task);
            $task->setState('completed');
            $task->sendValue($result);
            $coroutine->schedule($task);
          })
          ->catch(function (\Throwable $error) use ($task, $coroutine) {
            $coroutine->cancelProgress($task);
            $task->setState('erred');
            $task->setException(new \RuntimeException($error->getMessage()));
            $coroutine->schedule($task);
          })
          ->timeout(function () use ($task, $coroutine, $timeout) {
            $coroutine->cancelProgress($task);
            $task->setState('cancelled');
            $task->setException(new TimeoutError($timeout));
            $coroutine->schedule($task);
          });

        $task->customData($future);

        if ($signal !== 0 && $signalTask === null)
          $signalTask = $task->taskId();

        if ($signal !== 0 && \is_int($signalTask)) {
          $future->signal($signal, function ($signaled)
          use ($task, $coroutine, $signal, $signalTask) {
            $coroutine->cancelProgress($task);
            $task->setState('signaled');
            $signaler = $coroutine->getTask($signalTask);
            if ($signaler instanceof TaskInterface) {
              if ($signaler->hasCaller()) {
                $cancel = $signaler->getCaller();
                $cancel->setCaller();
                if ($cancel->getTimer() || $signaler->getTimer())
                  $cancel->setException(new TaskTimeout(0.0));
                else
                  $cancel->setException(new CancelledError('Task ' . $signalTask . ' signal: ' . $signal));

                return $coroutine->schedule($cancel);
              } else {
                $task->setException(new CancelledError('Task ' . $signalTask . ' signal: ' . $signal));
                $signaler->sendValue($signaled);
                $coroutine->schedule($signaler);
              }
            } else { // @codeCoverageIgnoreStart
              $task->setException(new \Exception(\sprintf('An unhandled signal received: %s', $signal)));
              $coroutine->schedule($task);
            } // @codeCoverageIgnoreEnd

          });
        }

        if ($channel instanceof Channeled && \is_int($channelTask)) {
          $channel->setFuture($future);
          $task->customState([$channel, $channelTask]);
          $future->progress(function ($type, $data)
          use ($coroutine, $channelTask) {
            $ipcTask = $coroutine->getTask($channelTask);
            if ($ipcTask instanceof TaskInterface) {
              $ipcTask->sendValue([$type, $data]);
              $coroutine->schedule($ipcTask);
            }
          });
        }
      }
    );
  }

  /**
   * Add and wait for result of an separate `thread` of execution.
   * - This function needs to be prefixed with `yield`
   *
   * @see https://docs.python.org/3.10/library/threading.html#module-threading
   *
   * @param callable $function
   * @param mixed $args
   *
   * @return mixed
   */
  public static function addThread(callable $function, ...$args)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine)
      use ($function, $args) {
        $task->taskType('threaded');
        $task->setState('process');
        $thread = $coroutine->addThread($task->taskId(), $function, ...$args)
          ->then(function ($result) use ($task, $coroutine) {
            $task->setState('completed');
            $task->sendValue($result);
            $coroutine->schedule($task);
          })->catch(function (\Throwable $error) use ($task, $coroutine) {
            $message = $error->getMessage();
            $isCancelled = \strpos($message, 'cancelled!') !== false;
            $task->setState($isCancelled ? 'cancelled' : 'erred');
            if ($isCancelled) {
              if ($task->hasCaller()) {
                $cancel = $task->getCaller();
                $cancel->setCaller();
                if ($cancel->getTimer() || $task->getTimer())
                  $cancel->setException(new TaskTimeout(0.0));
                else
                  $cancel->setException(new TaskCancelled($message));

                return $coroutine->schedule($cancel);
              } else {
                $task->setException(new TaskCancelled($message));
              }
            } else {
              $task->setException($error);
            }

            $coroutine->schedule($task);
          });

        $task->customData($thread);
      }
    );
  }

  /**
   * Add/execute a blocking `I/O` `future` task that runs in parallel.
   * This function will return `int` immediately, use `gather()` to get the result.
   * - This function needs to be prefixed with `yield`
   *
   * @see https://docs.python.org/3.7/library/asyncio-subprocess.html#subprocesses
   * @see https://docs.python.org/3.7/library/asyncio-dev.html#running-blocking-code
   *
   * @param callable|shell $command
   * @param int|float|null $timeout The timeout in seconds or null to disable
   * @param bool $display set to show future output
   * @param Channeled|resource|mixed|null $channel IPC/CSP communication to be passed to the underlying `Future` instance.
   * @param int|null $channelTask The task id to use for realtime **future** output interaction.
   * @param int $signal
   * @param int $signalTask The task to call when `future` is terminated with a signal.
   *
   * @return int
   */
  public static function spawnTask(
    $callable,
    $timeout = 0,
    bool $display = false,
    $channel = null,
    $channelTask = null,
    int $signal = 0,
    $signalTask = null,
    $taskType = 'yielded'

  ) {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine)
      use ($callable, $timeout, $display, $channel, $channelTask, $signal, $signalTask, $taskType) {
        $command = \awaitAble(function ()
        use ($callable, $timeout, $display, $channel, $channelTask, $signal, $signalTask, $taskType) {
          $result = yield yield Kernel::addFuture(
            $callable,
            $timeout,
            $display,
            $channel,
            $channelTask,
            $signal,
            $signalTask,
            $taskType
          );

          return $result;
        });

        $task->sendValue($coroutine->createTask($command));
        $coroutine->schedule($task);
      }
    );
  }

  /**
   * Stop/kill a `future` with `signal`, and also `cancel` the task.
   * - This function needs to be prefixed with `yield`
   *
   * @param int $tid The task id of the `future` task.
   * @param int $signal `Termination/kill` signal constant.
   *
   * @return bool
   */
  public static function spawnKill(int $tid, int $signal = \SIGKILL)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($tid, $signal) {
        $spawnedTask = $coroutine->getTask($tid);
        if ($spawnedTask instanceof TaskInterface) {
          $customData = $spawnedTask->getCustomData();
          if ($customData instanceof FutureInterface) {
            $customData->stop($signal);
          }
        }

        $task->sendValue($coroutine->cancelTask($tid));
        $coroutine->schedule($task);
      }
    );
  }

  /**
   * Add a signal handler for the signal, that's continuously monitored.
   * This function will return `int` immediately, use with `spawn_signal()`.
   * - The `$handler` function will be executed, if `future` is terminated with the `signal`.
   * - Expect the `$handler` to receive `(int $signal)`.
   * - This function needs to be prefixed with `yield`
   *
   * @see https://docs.python.org/3/library/signal.html#signal.signal
   *
   * @param int $signal
   * @param callable $handler
   *
   * @return int
   */
  public static function signalTask(int $signal, callable $handler)
  {
    return Kernel::away(function () use ($signal, $handler) {
      yield;
      while (true) {
        $trapSignal = yield;
        if ($signal === $trapSignal) {
          return $handler($signal);
        }
      }
    });
  }

  /**
   * Add a progress handler for the `future`, that's continuously monitored.
   * This function will return `int` immediately, use with `spawn_progress()`.
   * - The `$handler` function will be executed every time the `future` produces output.
   * - Expect the `$handler` to receive `(string $type, $data)`, where `$type` is either `out` or `err`.
   * - This function needs to be prefixed with `yield`
   *
   * @param callable $handler
   *
   * @return int
   */
  public static function progressTask(callable $handler)
  {
    return Kernel::away(function () use ($handler) {
      yield;
      while (true) {
        $received = yield;
        if (\is_array($received) && (\count($received) == 2)) {
          [$type, $data] = $received;
          $received = null;
          if (!\is_null($data))
            yield $handler($type, $data);
        }
      }
    });
  }

  /**
   * Run awaitable objects in the tasks set concurrently and block until the condition specified by race.
   *
   * Controls how the `gather()` function operates.
   * `gather_wait` will behave like **Promise** functions `All`, `Some`, `Any` in JavaScript.
   *
   * @param array<int|\Generator> $tasks
   * @param int $race - If set, initiate a competitive race between multiple tasks.
   * - When amount of tasks as completed, the `gather` will return with task results.
   * - When `0` (default), will wait for all to complete.
   * @param bool $exception - If `true` (default), the first raised exception is immediately
   *  propagated to the task that awaits on gather().
   * Other awaitables in the aws sequence won't be cancelled and will continue to run.
   * - If `false`, exceptions are treated the same as successful results, and aggregated in the result list.
   * @param bool $clear - If `true`, close/cancel remaining results, `false` (default)
   * @throws \LengthException - If the number of tasks less than the desired $race count.
   *
   * @see https://docs.python.org/3.7/library/asyncio-task.html#waiting-primitives
   *
   * @return array associative `$taskId` => `$result`
   */
  public static function gatherWait(array $tasks, int $race = 0, bool $exception = true, bool $clear = false)
  {
    self::$gatherCount = $race;
    self::$gatherShouldError = $exception;
    self::$gatherShouldClearCancelled = $clear;
    return Kernel::gather(...$tasks);
  }

  /**
   * Allow passing custom functions to control how `gather()` react after task process state changes.
   * This is mainly used for third party integration without repeating `Gather`main functionality.
   *
   * @param string $isCustomSate - for custom status state to check on not stated tasks
   * @param null|callable $onPreComplete - for already finish tasks
   * @param null|callable $onProcessing - for not running tasks
   * @param null|callable $onCompleted - for finished tasks
   * @param null|callable $onError - for erring or failing tasks
   * @param null|callable $onCancel - for aborted cancelled tasks
   * @param null|callable $onClear - for cleanup on tasks not to be used any longer
   */
  public static function gatherController(
    string $isCustomSate = 'n/a',
    ?callable $onPreComplete = null,
    ?callable $onProcessing = null,
    ?callable $onCompleted = null,
    ?callable $onError = null,
    ?callable $onCancel = null,
    ?callable $onClear = null
  ): void {
    self::$isCustomSate = $isCustomSate;
    self::$onPreComplete = $onPreComplete;
    self::$onProcessing = $onProcessing;
    self::$onCompleted = $onCompleted;
    self::$onError = $onError;
    self::$onCancel = $onCancel;
    self::$onClear = $onClear;
  }

  /**
   * Run awaitable objects in the taskId sequence concurrently.
   * If any awaitable in taskId is a coroutine, it is automatically scheduled as a Task.
   *
   * If all awaitables are completed successfully, the result is an aggregate list of returned values.
   * The order of result values corresponds to the order of awaitables in taskId.
   *
   * The first raised exception is immediately propagated to the task that awaits on gather().
   * Other awaitables in the sequence won't be cancelled and will continue to run.
   *
   * @see https://docs.python.org/3.7/library/asyncio-task.html#asyncio.gather
   * @source https://github.com/python/cpython/blob/11909c12c75a7f377460561abc97707a4006fc07/Lib/asyncio/tasks.py#L678
   *
   * @param int|array $taskId
   * @return array[] associative `$taskId` => `$result`
   */
  public static function gather(...$taskId)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($taskId) {
        $gatherCount = self::$gatherCount;
        $gatherShouldError = self::$gatherShouldError;
        $gatherShouldClearCancelled = self::$gatherShouldClearCancelled;
        self::$gatherCount = 0;
        self::$gatherShouldError = true;
        self::$gatherShouldClearCancelled = false;

        $isCustomSate = self::$isCustomSate;
        $onPreComplete = self::$onPreComplete;
        $onProcessing = self::$onProcessing;
        $onCompleted = self::$onCompleted;
        $onError = self::$onError;
        $onCancel = self::$onCancel;
        $onClear = self::$onClear;
        self::gatherController();

        /**
         * @var TaskInterface[];
         */
        $taskIdList = [];
        $isGatherListGenerator = false;
        $gatherIdList = \is_array($taskId[0]) ? $taskId[0] : $taskId;
        foreach ($gatherIdList as $id => $value) {
          if ($value instanceof \Generator) {
            $isGatherListGenerator = true;
            $id = $coroutine->createTask($value);
            $taskIdList[$id] = $id;
          } elseif (\is_int($value)) {
            $taskIdList[$value] = $value;
          } else {
            \panic("Invalid access, only array of integers - `task Id`, or generator objects allowed!");
          }
        }

        if ($isGatherListGenerator) {
          $gatherIdList = \array_keys($taskIdList);
        }

        $results = [];
        $count = \count($taskIdList);
        $gatherSet = ($gatherCount > 0);
        if ($gatherSet) {
          if ($count < $gatherCount) {
            throw new LengthException(\sprintf('The (%d) tasks, not enough to fulfill the `race: (%d)` count!', $count, $gatherCount));
          }
        }

        $taskList = $coroutine->currentList();

        $completeList = $coroutine->completedList();
        $countComplete = \count($completeList);
        $gatherCompleteCount = 0;
        $isResultsException = false;

        foreach ($gatherIdList as $index => $tid) {
          if (isset($taskList[$tid]) || isset($completeList[$tid])) {
            // @codeCoverageIgnoreStart
            if (
              isset($taskList[$tid])
              && $taskList[$tid] instanceof TaskInterface
              && $taskList[$tid]->isStateless()
            ) {
              $count--;
              $results[$tid] = null;
              $gatherCompleteCount++;
              unset($taskList[$tid]);
              unset($taskIdList[$tid]);
              unset($gatherIdList[$index]);
            }
            // @codeCoverageIgnoreEnd

            continue;
          } else {
            $isResultsException = new InvalidStateError('Task ' . $tid . ' does not exists.');
            if ($gatherShouldError) {
              $countComplete = 0;
              break;
            } else {
              $results[$tid] = $isResultsException;
              $isResultsException = false;
              unset($gatherIdList[$index]);
            }
          }
        }

        // Check and handle tasks already completed before entering/executing gather().
        if ($countComplete > 0) {
          foreach ($completeList as $id => $tasks) {
            if (isset($taskIdList[$id])) {
              if (\is_callable($onPreComplete)) {
                $result = $onPreComplete($tasks);
              } else {
                $result = $tasks->result();
              }

              if ($result instanceof \Throwable) {
                $isResultsException = $result;
              } else {
                $results[$id] = $result;
              }

              $count--;
              $gatherCompleteCount++;
              unset($taskIdList[$id]);

              // Update running task list.
              $coroutine->updateCompleted($id, $completeList);

              // end loop, if gather race count reached
              if ($gatherCompleteCount == $gatherCount)
                break;
            }
          }
        }

        // Check and update base off gather race and completed count.
        if ($gatherSet) {
          $subCount = ($gatherCount - $gatherCompleteCount);
          if ($gatherCompleteCount != $gatherCount) {
            $count = $subCount;
          } elseif ($gatherCompleteCount == $gatherCount) {
            $count = 0;
          }
        }

        // Skip wait, just proceed to propagate/schedule the exception, if set.
        if ($gatherShouldError && ($isResultsException !== false)) {
          $count = 0;
        }

        // Run and wait until race or count is reached.
        while ($count > 0) {
          foreach ($taskIdList as $id) {
            if (isset($taskList[$id])) {
              $tasks = $taskList[$id];
              // Handle if parallel task, check already completed or has not started.
              if ($tasks->isParallel()) {
                $completeList = $coroutine->completedList();
                if (isset($completeList[$id])) {
                  $tasks = $completeList[$id];
                  $tasks->setState('completed');
                  $tasks->taskType('');
                  continue;
                }

                // Handle if future not running, force run.
                if ($tasks->isFuture()) {
                  $type = $tasks->getCustomState();
                  if (\is_string($type) && $type == 'signaling') {
                    $coroutine->execute('signaling');
                  } elseif (\is_string($type) && $type == 'yielded') {
                    $coroutine->execute(true);
                  } else {
                    $coroutine->execute('channeling');
                  }
                }
              }

              // Handle if any other task not running/pending, force run.
              if (
                $tasks->isCustomState($isCustomSate)
                || $tasks->isPending()
                || $tasks->isRescheduled()
              ) {
                if (\is_callable($onProcessing)) {
                  $onProcessing($tasks, $coroutine);
                } else {
                  try {
                    if (($tasks->isPending() || $tasks->isRescheduled()) && $tasks->isCustomState(true)) {
                      $tasks->customState();
                      $coroutine->schedule($tasks);
                      $tasks->run();
                      continue;
                    }

                    if ($tasks->isPending()) {
                      $coroutine->execute();
                    } elseif ($tasks->isRescheduled()) {
                      $coroutine->execute($tasks->getCycles() > 1);
                    }
                  } catch (\Throwable $error) {
                    $tasks->setState(
                      ($error instanceof CancelledError ? 'cancelled' : 'erred')
                    );

                    $tasks->setException($error);
                  }
                }
                // Handle if task finished.
              } elseif ($tasks->isCompleted()) {
                if (\is_callable($onCompleted)) {
                  $result = $onCompleted($tasks);
                } else {
                  $result = $tasks->result();
                }

                $count--;
                unset($taskList[$id]);
                $coroutine->updateCompleted($id);
                $results[$id] = $result;
                // end loop, if set and race count reached
                if ($gatherSet) {
                  $subCount--;
                  if ($subCount == 0)
                    break;
                }
                // Handle if task erred or cancelled.
              } elseif (
                $tasks->isErred()
                || $tasks->isCancelled()
                || $tasks->isSignaled()
              ) {
                if ($tasks->isErred() && \is_callable($onError)) {
                  $isResultsException = $onError($tasks);
                } elseif ($tasks->isCancelled() && \is_callable($onCancel)) {
                  $isResultsException = $onCancel($tasks);
                } else {
                  $isResultsException = $tasks->result();
                }

                $count--;
                unset($taskList[$id]);
                $coroutine->updateCompleted($id, $taskList, $onClear, false, true);
                // Check and propagate/schedule the exception.
                if ($gatherShouldError) {
                  $count = 0;
                  break;
                } else {
                  $results[$id] = $isResultsException;
                  $isResultsException = false;
                }
              }
            }
          }
        }

        // Check for, update and cancel/close any result not part of race gather count.
        if ($gatherSet && (\is_callable($onClear) || $gatherShouldClearCancelled) && ($isResultsException === false)) {
          $resultId = \array_keys($results);
          $abortList = \array_diff($gatherIdList, $resultId);
          $currentList = $coroutine->currentList();
          $finishedList = $coroutine->completedList();
          foreach ($abortList as $id) {
            if (isset($finishedList[$id])) {
              // Update task list removing tasks already completed that will not be used, mark and execute any custom update/cancel routines
              $coroutine->updateCompleted($id, $finishedList, $onClear);
            } elseif (isset($currentList[$id])) {
              // Update task list removing current running tasks not part of race gather count, mark and execute any custom update, then cancel routine
              $coroutine->updateCompleted($id, $currentList, $onClear, true);
            }
          }
        }

        if ($gatherShouldError && ($isResultsException !== false)) {
          $task->setException($isResultsException);
        } else {
          $task->sendValue($results);
        }

        $coroutine->schedule($task);
      }
    );
  }

  /**
   * Block/sleep for delay seconds.
   * Suspends the calling task, allowing other tasks to run.
   *
   * @see https://docs.python.org/3.9/library/asyncio-task.html#sleeping
   * @source https://github.com/python/cpython/blob/11909c12c75a7f377460561abc97707a4006fc07/Lib/asyncio/tasks.py#L593
   *
   * @param float $delay
   * @param mixed $result - If provided, it is returned to the caller when the coroutine complete
   */
  public static function sleepFor(float $delay = 0.0, $result = null)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($delay, $result) {
        $coroutine->addTimeout(function () use ($task, $coroutine, $result) {
          $task->setTimer();
          if (!empty($result))
            $task->sendValue($result);
          $coroutine->schedule($task);
        }, $delay, $task->taskId());
      }
    );
  }

  /**
   * Wait for the `callable` to complete with a timeout.
   *
   * @see https://docs.python.org/3.10/library/asyncio-task.html#timeouts
   * @source https://github.com/python/cpython/blob/bb0b5c12419b8fa657c96185d62212aea975f500/Lib/asyncio/tasks.py#L392
   *
   * @param callable $callable
   * @param float $timeout
   * @return mixed
   * @throws TimeoutError If a timeout occurred into `current` task.
   * @throws CancelledError If a timeout occurred into `callable` task.
   */
  public static function waitFor($callable, float $timeout = null)
  {
    return new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($callable, $timeout) {
        if ($callable instanceof \Generator) {
          $taskId = $coroutine->createTask($callable);
        } else {
          $taskId = $coroutine->createTask(\awaitAble($callable));
        }

        $coroutine->addTimeout(function () use ($taskId, $timeout, $task, $coroutine) {
          $task->setTimer();
          if (!empty($timeout)) {
            $cancelTask = $coroutine->getTask($taskId);
            if ($cancelTask instanceof TaskInterface)
              $cancelTask->setException(new CancelledError("Task {$taskId}!"));
            $task->setException(new TimeoutError($timeout));
          } elseif ($coroutine->isCompleted($taskId)) {
            $tasks = $coroutine->getCompleted($taskId);
            $result = $tasks->result();
            $coroutine->updateCompleted($taskId);
            $task->sendValue($result);
          }

          $coroutine->schedule($task);
        }, $timeout, $task->taskId());
      }
    );
  }

  /**
   * Any blocking operation can be cancelled by a timeout.
   * Throws a `TaskTimeout` exception in the calling task after seconds have elapsed.
   * This function may be used in two ways. You can apply it to the execution of a single coroutine:
   *
   *```php
   *         yield timeout_after(seconds, coro(args))
   *
   * # Or you can use it as an asynchronous context manager to apply a timeout to a block of statements:
   *
   *         async_with(timeout_after(seconds));
   *               // Or
   *         yield with(timeout_after(seconds));
   *            yield coro1(args)
   *            yield coro2(args)
   *            ...
   *```
   * - This function needs to be prefixed with `yield`
   *
   * @param float $timeout
   * @param Generator|callable $callable
   * @param mixed ...$args
   * @return mixed
   * @throws TaskTimeout If a timeout has occurred.
   * @see https://curio.readthedocs.io/en/latest/reference.html#timeout_after
   * @source https://github.com/dabeaz/curio/blob/27ccf4d130dd8c048e28bd15a22015bce3f55d53/curio/time.py#L141
   */
  public static function timeoutAfter(float $timeout = 0.0, $callable = null, ...$args)
  {
    if ($callable)
      return self::__timeoutAfter($timeout, $callable, ...$args);

    return new TimeoutAfter($timeout);
  }

  protected static function __timeoutAfter(float $timeout = 0.0, $callable = null, ...$args)
  {
    return yield yield new Kernel(
      function (TaskInterface $task, CoroutineInterface $coroutine) use ($callable, $timeout, $args) {
        $skip = false;
        if ($callable instanceof \Generator) {
          $taskId = $coroutine->createTask($callable);
        } elseif (\is_callable($callable)) {
          if ($callable === \run_in_process || $callable === \run_in_thread)
            $skip = true;

          $taskId = $coroutine->createTask(\awaitAble($callable, ...$args));
        }

        $callableTask = $coroutine->getTask($taskId);
        $coroutine->addTimeout(function () use ($timeout, $task, $coroutine, $taskId) {
          $task->setTimer();
          $callableTask = $coroutine->getTask($taskId);
          if (!$coroutine->isCompleted($taskId)) {
            $futureThread = $callableTask->getCustomData();
            if ($futureThread instanceof FutureInterface || $futureThread instanceof Thread || $futureThread instanceof TWorker) {
              $callableTask->setCaller($task);
              if ($futureThread instanceof FutureInterface) {
                $delay = 1;
                $futureThread->stop(\SIGKILL);
              } else {
                $delay = 2;
                $futureThread->cancel($taskId);
              }

              $canceled = function () use ($task, $timeout, $coroutine) {
                $task->setException(new TaskTimeout($timeout));
                $coroutine->schedule($task);
              };

              return $coroutine->createTask(\delayer($delay, $canceled));
            } else {
              $coroutine->schedule($callableTask);
              $coroutine->createTask(\delayer(2, [$coroutine, 'cancelTask'], $taskId));
            }

            $task->setException(new TaskTimeout($timeout));
            if ($task->hasCaller()) {
              $caller = $task->getCaller();
              $task->setCaller();
              $coroutine->schedule($caller);
            }
          } else {
            $completed = $coroutine->getCompleted($taskId);
            $result = $completed->result();
            $coroutine->updateCompleted($taskId);
            $task->sendValue($result);
          }

          $coroutine->schedule($task);
        }, $timeout, $task->taskId());

        if (!$skip)
          $coroutine->schedule($callableTask);
      }
    );
  }

  /**
   * Begins an asynchronous context manager that is able to suspend execution in its `__enter()` and `__exit()` methods.
   *  It is a **Error** to use `async_with` outside of an `async` function.
   *
   * @see https://book.pythontips.com/en/latest/context_managers.html
   *
   * @param ContextInterface|resource $context
   * @param ContainerInterface|object|ContextInterface|null $object
   * @param array[] $options
   * @return ContextInterface
   * @throws Panic if no context instance, or `__enter()` method does not return `true`.
   */
  public static function asyncWith($context = null, $object = null, array $options = [])
  {
    $di = $options;
    if (\is_object($object) && !$object instanceof ContextInterface) {
      $inject = $di = $object;
    }

    // @codeCoverageIgnoreStart
    if (
      (!empty($options) && isset($inject))
      && ($inject instanceof ContainerInterface)
    ) {
      if ($inject instanceof InjectionInterface) {
        $last = \array_pop($options);
        if (\is_array($options)) {
          foreach ($options as $className => $friendlyName)
            $inject->set($className, $friendlyName);
        }

        if (\is_array($last)) {
          foreach ($last as $identifier => $parameters) {
            $di = $inject->get($identifier, $parameters);
            if (\is_object($di))
              break;
          }
        }
      } else {
        $identifier = \array_unshift($options);
        $di = $inject->get((string) $identifier);
      }
    }

    if (\is_resource($context)) {
      $context = new Contextify($context, $di);
    }

    if ($object instanceof ContextInterface) {
      yield \ending($object);
    }
    // @codeCoverageIgnoreEnd


    if ($context instanceof ContextInterface) {
      if ($context instanceof Semaphore) {
        yield $context->acquire();
      }

      try {
        yield $context();
      } finally {
        if ($context->entered())
          return $context;

        \panic('No valid context manager found!');
      }
    }
  }

  /**
   * Begins an asynchronous context manager that is able to suspend execution in its `__enter()` and `__exit()` methods.
   * It is a **Error** to use `with` outside of an `async` function.
   * - This function needs to be prefixed with `yield`
   *
   * @see https://book.pythontips.com/en/latest/context_managers.html
   *
   * @param ContextInterface|resource $context
   * @param \Closure $as - Will receive a **ContextInterface** instance, when finish will execute `__exit()`, the `ending()` function.
   * @return ContextInterface
   * @throws Panic if no context instance, or `__enter()` method does not return `true`.
   */
  public static function with($context = null, \Closure $as = null)
  {
    $task = \coroutine()->getTask(yield \current_task());
    // @codeCoverageIgnoreStart
    if (\is_resource($context)) {
      $context = new Contextify($context);
      // @codeCoverageIgnoreEnd
    } elseif ($task->hasWith()) {
      $contextTask = $task->getWith();
      $task->setWith($context);
      yield \ending($contextTask);
    }

    if ($context instanceof ContextInterface) {
      yield $context();

      if (!$context->entered())
        \panic('No valid context manager found!');

      if ($context instanceof Semaphore)
        yield $context->acquire();

      try {
        if ($as) {
          yield $as($context);
          try {
            yield $context();

            if ($context instanceof Semaphore)
              yield $context->release();
          } finally {
            if (!$context->exited())
              $context->__exit(new Panic('Context block failed to exit!'));
          }
        }
      } finally {
        return $context;
      }
    }
  }

  /**
   * Allows convenient iteration over asynchronous `Iterator`.
   * This will obtain `task` results in the order that they complete, as they complete.
   * - Only `current()`, and `valid()` _methods_ SHOULD BE *implemented* in `Iterator` .
   * - This function needs to be prefixed with `yield`
   *
   * @param AsyncIterator $task A `task` producing _results_ in **chunks**.
   * @param \Closure $as Will **receive** _chunk_ of a _task_ `result` for processing.
   * @return void
   * @see https://docs.python.org/3/reference/compound_stmts.html#the-async-for-statement
   * @see https://docs.python.org/3.10/reference/expressions.html#asynchronous-generator-functions
   */
  public static function asyncFor(AsyncIterator $task, \Closure $as)
  {
    while (true) {
      try {
        $item = yield $task->current();
      } finally {
        if ($item !== null)
          yield $as($item);
      }

      if (!$task->valid())
        break;
    }

    if ($task instanceof ContextInterface)
      yield \ending($task);
  }

  /**
   * Makes an resolvable function from `label` name that's callable with `coroutine_run()`, `go()`,
   * `await()`, `away()`, `spawner()` and inturn calls **create_task()**.
   * The passed in `function` is wrapped to be `awaitAble`. The `label` will be `Define()` and make that _name_ a **global** `constant`.
   *
   * - This will store a closure in `Co` static class with supplied `label` name as key.
   * @see https://docs.python.org/3.10/reference/compound_stmts.html#async-def
   *
   * @param string $label
   * @param callable $function
   * @return void
   * @throws Panic — if the **named** `label` function already exists.
   */
  public static function async(string $label, callable $function): void
  {
    $closure = function (...$args) use ($function) {
      yield;
      $coroutine = \coroutine();
      try {
        $async = $function(...$args);
        if ($async === null)
          return;

        $result = yield $async;
      } catch (\Throwable $error) {
        return yield self::throwable($error, $coroutine);
      }

      $task = $coroutine->getTask(yield \current_task());
      if ($task instanceof TaskInterface)
        $task->setResult($result);

      return $result;
    };

    Co::addFunction($label, $closure);
  }

  protected static function throwable(\Throwable $error, CoroutineInterface $coroutine)
  {
    yield;
    $task = $coroutine->getTask(yield \current_task());
    $task->setState(
      ($error instanceof CancelledError ? 'cancelled' : 'erred')
    );

    $context = $task->getWith();
    $parent = \get_parent_class($error);
    $isParentError = $parent === false || \strpos($parent, 'Async\\') !== false || $parent === 'Exception';
    if ($task->hasCaller()) {
      $unjoined = $task->getCaller();
      if (!$isParentError)
        $error = new \Error($error->getMessage(), $error->getCode(), $error->getPrevious());

      $unjoined->setException($error);
      $coroutine->schedule($unjoined);
    } elseif (($context instanceof ContextInterface && $context->isWith() && $context->withTask() === $task) || $task->hasGroup()) {
      /** @var TaskGroup|ContextInterface */
      $instance = $task->hasGroup() ? $task->getGroup() : $context;
      try {
        yield $instance->__exit($error);
      } catch (\Throwable $e) {
      }

      if ($task->hasGroup())
        $instance->task_done($task, true, $error);
    }

    if ($task->isSelfCancellation()) {
      $task->setException($error);
      return yield yield $coroutine->schedule($task);
    } else {
      $task->setException($error);
    }

    return $coroutine->schedule($task);
  }

  /**
   * This function will `pause` and execute the `label` function, with `arguments`,
   * only functions created with `async`, or some **reserved**,  or
   * a `PHP` builtin callable will work, anything else will throw `Panic` exception.
   * If `label` is a `PHP` builtin _command/function_ it will execute asynchronously in a **child/subprocess**,
   * by `proc_open`, or `uv_spawn` if **libuv** is loaded.
   *
   * - This function needs to be prefixed with `yield`
   *
   * @see https://www.python.org/dev/peps/pep-0492/#id56
   * @see https://docs.python.org/3.10/reference/expressions.html#await
   *
   * @param string $label `async` function, **reserved**  or `PHP` builtin function.
   * @param mixed ...$arguments
   * @return mixed
   * @throws Panic if the **named** `label` function does not exists.
   */
  public static function await(string $label, ...$arguments)
  {
    switch ($label) {
      case 'sleep':
      case 'sleep_for':
        return yield Kernel::sleepFor(...$arguments);
      case 'cancel':
        return yield Kernel::cancelTask(...$arguments);
      case 'join':
      case 'join_task':
        $tid = \array_shift($arguments);
        return yield Kernel::joinTask($tid);
      case 'spawn':
      case 'spawner':
        $async = \array_shift($arguments);
        return yield Kernel::away($async, ...$arguments);
      case 'gather':
        return yield Kernel::gather(...$arguments);
    }

    if (Co::isFunction($label)) {
      return yield Co::getFunction($label)(...$arguments);
    }

    $display = \strpos($label, '*', 1) !== false;
    $away = \strpos($label, '*', 0);
    $label = \str_replace(['*', ' '], '', $label);
    $function = 'Async\Path\file_' . \str_replace(['file_', ' '], '', $label);
    $file_function = 'Async\Path\file_' . $label;
    if (\in_array($label, ['watch_dir', 'watch_task', 'watch', 'watch_file'], true)) {
      $misc_function = 'Async\Path\\' . $label;
      if (\is_callable($misc_function))
        return yield $misc_function(...$arguments);
    }

    if (\is_callable($function))
      return yield $function(...$arguments);
    elseif (\is_callable($file_function))
      return yield $file_function(...$arguments);

    if (\is_callable($label)) {
      // @codeCoverageIgnoreStart
      $system = function () use ($label, $arguments) {
        return @$label(...$arguments);
      };
      // @codeCoverageIgnoreEnd

      if ($away === 0)
        return yield Kernel::spawnTask($system, 0, $display);

      return yield awaitable_future(function () use ($system, $display) {
        return Kernel::addFuture($system, 0, $display);
      });
    }

    \panic("No function named: '{$label}' exists!");
  }

  /**
   * **Schedule** an `async`, a coroutine _function_ for execution.
   * - This function needs to be prefixed with `yield`
   *
   * @see https://curio.readthedocs.io/en/latest/reference.html#tasks
   * @see https://docs.python.org/3.10/library/asyncio-task.html#creating-tasks
   * @source https://github.com/python/cpython/blob/11909c12c75a7f377460561abc97707a4006fc07/Lib/asyncio/tasks.py#L331
   *
   * @param generator|callable|string $label - `async`, a coroutine, or a function to make `awaitable`
   * @param mixed ...$args - if **$label** is `Generator`, $args can hold `customState`, and `customData`
   * - for third party code integration.
   *
   * @return int $task id
   */
  public static function away($label, ...$args)
  {
    if (\is_string($label) && Co::isFunction($label)) {
      return Kernel::createTask(Co::getFunction($label)(...$args), true);
    }

    return new Kernel(
      function ($task, CoroutineInterface $coroutine) use ($label, $args) {
        if ($label instanceof \Generator) {
          $tid = $coroutine->createTask($label);
          if (!empty($args)) {
            $createdTask = $coroutine->getTask($tid);
            if (($args[0] === 'true') || ($args[0] === true))
              $createdTask->customState(true);
            else
              $createdTask->customState($args[0]);

            if (isset($args[1])) {
              $createdTask->customData($args[1]);
            }
          }

          $task->sendValue($tid);
        } else {
          $task->sendValue($coroutine->createTask(\awaitAble($label, ...$args)));
        }

        $coroutine->isFiber($task)
          ? $coroutine->scheduleFiber($task)
          : $coroutine->schedule($task);
      }
    );
  }

  public static function suspendFiber($data)
  {
    return new Kernel(
      function (FiberInterface $fiber, CoroutineInterface $coroutine) use ($data) {
        $fiber->setState('suspended');
        $suspendTo = $fiber->getCaller();
        $suspendTo->sendValue($data);
        $coroutine->isFiber($suspendTo)
          ? $coroutine->scheduleFiber($suspendTo)
          : $coroutine->schedule($suspendTo);
      }
    );
  }

  public static function startFiber(FiberInterface $fiber)
  {
    return new Kernel(
      function ($caller, CoroutineInterface $coroutine) use ($fiber) {
        $fiber->setCaller($caller);
        $coroutine->scheduleFiber($fiber);
      }
    );
  }

  public static function resumeFiber(FiberInterface $fiber, $data)
  {
    return new Kernel(
      function ($caller, CoroutineInterface $coroutine) use ($fiber, $data) {
        $fiber->setState('rescheduled');
        $fiber->setCaller($caller);
        $fiber->sendValue($data);
        $coroutine->scheduleFiber($fiber);
      }
    );
  }

  public static function throwFiber(FiberInterface $fiber, $exception)
  {
    return new Kernel(
      function ($caller, CoroutineInterface $coroutine) use ($fiber, $exception) {
        $fiber->setState('erred');
        $fiber->setCaller($caller);
        $fiber->setException($exception);
        $coroutine->scheduleFiber($fiber);
      }
    );
  }

  /**
   * Returns the _result_ of a completed `task`.
   *
   * @param integer $tid task id instance
   * @return mixed
   * @throws Exception|Error if _task_ `erred`.
   * @throws InvalidStateError if still `running`, not terminated.
   */
  public static function resultFor(int $tid)
  {
    $result = null;
    $coroutine = \coroutine();
    if ($coroutine->isGroup($tid)) {
      $result = $coroutine->getGroupResult($tid);
    } elseif ($coroutine->isCompleted($tid)) {
      $result = $coroutine->getCompleted($tid)->result();
      $coroutine->updateCompleted($tid);
    } elseif ($coroutine->getTask($tid)) {
      throw new InvalidStateError("{$tid}");
    }

    if ($result instanceof \Throwable)
      throw $result;

    return $result;
  }

  /**
   * Returns the _exception_ of a `task`.
   *
   * @param integer $tid task id instance
   * @return null|Throwable
   * @throws InvalidStateError if _task_ still `running`, not terminated.
   */
  public static function exceptionFor(int $tid): ?\Throwable
  {
    $exception = null;
    $coroutine = \coroutine();
    if ($coroutine->isGroup($tid)) {
      $exception = $coroutine->getGroupResult($tid);
    } elseif ($coroutine->isCompleted($tid)) {
      $exception = $coroutine->getCompleted($tid)->exception();
      $coroutine->updateCompleted($tid);
    } elseif ($coroutine->getTask($tid)) {
      throw new InvalidStateError("{$tid}");
    }

    return ($exception instanceof \Throwable) ? $exception : null;
  }
}