Coroutine/Coroutine.php
<?php
declare(strict_types=1);
namespace Async;
use Async\Spawn\Future;
use Async\Spawn\FutureHandler;
use Async\Spawn\FutureInterface;
use Async\Spawn\ChanneledInterface;
use Async\Threads\Thread;
use Async\Threads\TWorker;
use Async\Kernel;
use Async\Task;
use Async\Parallel;
use Async\ParallelInterface;
use Async\Signaler;
use Async\TaskInterface;
use Async\ReturnValue;
use Async\PlainValue;
use Async\CoroutineInterface;
use Async\CancelledError;
use Async\InvalidArgumentException;
use Async\Fiber;
use Async\FiberInterface;
/**
* The Scheduler
*
* @see https://docs.python.org/3/library/asyncio-task.html#coroutines
* @see https://curio.readthedocs.io/en/latest/reference.html#coroutines
*/
final class Coroutine implements CoroutineInterface
{
/**
* checker for main supervisor task running state
*
* @var boolean
*/
protected $ioStarted = false;
/**
* a task's unique id number
*
* @var int
*/
protected $maxTaskId = 0;
/**
* List of currently running tasks
*
* @var array[] [taskId => task]
*/
protected $taskMap = [];
/**
* List of completed tasks
*
* @var TaskInterface[]|FiberInterface[]
*/
protected $completedMap = [];
/**
* List of `TaskGroup` Results
*
* @var array[] [taskId => result]
*/
protected $taskGroupMap = [];
/**
* List of `cancelled` tasks
*
* @var array[] [taskId => true]
*/
protected $cancelledMap = [];
/**
* Queue of `Task`, holding all created `coroutines/generators`
*
* @var \SplQueue<TaskInterface|FiberInterface|\Generator>
*/
protected $taskQueue;
/**
* A list of timers, or **UV** timer handles, added by `addTimeout`.
*
* @var array
*/
protected $timers = [];
/**
* Combined list of readable `id` of socket/streams/events, and read callbacks.
*
* @var resource[] [id, tasks]
*/
protected $waitingForRead = [];
/**
* Combined list of writable `id` of socket/streams/events, and write callbacks.
*
* @var resource[] [id, tasks]
*/
protected $waitingForWrite = [];
/**
* The **UV** event loop instance,
* If not set, will use PHP built-in `stream_select`
*
* @var \UVLoop
*/
protected $uv;
/**
* The **UV** Stream/Socket/FD event callback
*
* @var callable
*/
protected $onEvent;
/**
* The **UV** timer event callback
*
* @var callable
*/
protected $onTimer;
/**
* The **UV** signal event callback
*
* @var callable
*/
protected $onSignal;
/**
* Check for `libuv` UV Signal feature, mainly for Windows.
*
* @var bool
*/
protected $isUvSignal;
/**
* Check/counter for `libuv` UV File System feature.
*
* @var int
*/
protected $uvFileSystem = 0;
/**
* Check/counter for `libuv` for all Network I/O features
* **UVTcp**, **UVUdp**, **UVPipe**, etc...
*
* @var int
*/
protected $uvNetwork = 0;
/**
* list of **UV** event handles, added by `addReader`, `addWriter`
*
* @var \UV[]
*/
protected $events = [];
/**
* list of **UV** signal handles, added by `addSignal`, `removeSignal`
*
* @var \UVSignal[]
*/
protected $signals = [];
/**
* @var FutureHandler
*/
protected $future = null;
/**
* @var Parallel
*/
protected $parallel;
/**
* @var Thread
*/
protected $thread;
/**
* @var Signaler
*/
protected $signaler;
/**
* Check for prefer high-resolution timer, available as of PHP 7.3+
*
* @var bool
*/
protected $isHighTimer;
protected $isFutureActive = false;
protected $channelCounter = null;
public function __destruct()
{
$this->shutdown(0);
unset($this->taskQueue);
$this->taskQueue = null;
}
public function close()
{
if ($this->uv instanceof \UVLoop) {
foreach ($this->timers as $timer) {
if ($timer instanceof \UVTimer && \uv_is_active($timer))
\uv_timer_stop($timer);
}
foreach ($this->signals as $signal) {
if ($signal instanceof \UVSignal && \uv_is_active($signal))
\uv_signal_stop($signal);
}
foreach ($this->events as $event) {
if ($event instanceof \UV && \uv_is_active($event))
\uv_close($event);
}
if ($this->thread instanceof Thread) {
$this->thread->close();
}
@\uv_stop($this->uv);
@\uv_run($this->uv, \UV::RUN_NOWAIT);
@\uv_loop_delete($this->uv);
}
if ($this->parallel instanceof ParallelInterface) {
$this->parallel->close();
}
$this->uv = null;
$this->parallel = null;
$this->thread = null;
unset($this->future);
$this->future = null;
unset($this->signaler);
$this->signaler = null;
$this->onEvent = null;
$this->onTimer = null;
$this->onSignal = null;
$this->isUvSignal = null;
$this->isHighTimer = null;
$this->maxTaskId = 0;
$this->uvFileSystem = 0;
$this->taskMap = [];
$this->completedMap = [];
$this->taskGroupMap = null;
$this->cancelledMap = null;
$this->timers = [];
$this->waitingForRead = [];
$this->waitingForWrite = [];
$this->events = [];
$this->signals = [];
$this->ioStarted = false;
}
/**
* This scheduler will detect if the [`ext-uv` PECL extension](https://pecl.php.net/package/uv) is
* installed, which provides an interface to `libuv` library. An native like **PHP** event loop engine.
* - To manually turn off `libuv` use: `->setup(false);`
*
* @see https://github.com/amphp/ext-uv
*/
public function __construct()
{
$this->maxTaskId = Co::getUnique('dirty') === 1 ? Co::getUnique('max') : \random_int(10000, 9999999999);
Co::reset();
Co::setLoop($this);
$this->initSignals();
if (\IS_UV) {
$this->uv = \uv_loop_new();
$co = $this;
$channelLoop = function ($wait_count) use (&$co) {
$co->channelCounter = $wait_count;
$co->futureOn();
$co->run();
$co->futureOff();
$co->channelCounter = null;
};
Future::setChannelTick($channelLoop);
\spawn_setup($this->uv);
\uv_native(true);
$this->onEvent = function ($event, $status, $events, $stream) {
if ($status !== 0) {
$this->pollEvent($stream);
if ($events === 0) {
$events = \UV::READABLE | \UV::WRITABLE;
}
}
if (isset($this->waitingForRead[(int) $stream]) && ($events & \UV::READABLE)) {
$this->updateScheduler('read', $stream, true);
}
if (isset($this->waitingForWrite[(int) $stream]) && ($events & \UV::WRITABLE)) {
$this->updateScheduler('write', $stream, true);
}
};
$this->onTimer = function ($timer) {
$taskTimer = $this->timers[(int) $timer];
@\uv_timer_stop($timer);
\uv_unref($timer);
unset($this->timers[(int) $timer]);
$this->executeTask($taskTimer[1], $timer);
};
if (\IS_THREADED_UV)
$this->thread = new Thread($this, $this->uv);
}
if ($this->isHighTimer = \function_exists('hrtime'))
Co::setTiming('hrtime', true);
$this->parallel = new Parallel($this);
$this->future = $this->parallel->getFutureHandler();
$this->taskQueue = new \SplQueue();
}
protected function timestamp(): float
{
return (float) ($this->isHighTimer ? \hrtime(true) / 1e+9 : \microtime(true));
}
protected function addEvent($stream)
{
if (!isset($this->events[(int) $stream])) {
$meta = \stream_get_meta_data($stream);
switch ($meta['stream_type'] ?? '') {
case 'STDIO':
case 'TEMP':
$this->events[(int) $stream] = false;
break;
case 'tcp_socket/ssl':
default:
if (\IS_WINDOWS)
$this->events[(int) $stream] = \uv_poll_init_socket($this->uv, $stream);
else
$this->events[(int) $stream] = \uv_poll_init($this->uv, $stream);
break;
}
}
if ($this->events[(int) $stream] !== false) {
$this->pollEvent($stream);
}
}
protected function removeReadEvent($stream)
{
if (!isset($this->events[(int) $stream])) {
return;
}
if (isset($this->waitingForRead[(int) $stream])) {
$event = $this->events[(int) $stream];
if ($event instanceof \UVPoll) {
\uv_poll_stop($event);
\uv_close($event);
}
unset($this->events[(int) $stream]);
return;
}
$this->pollEvent($stream);
}
protected function removeWriteEvent($stream)
{
if (!isset($this->events[(int) $stream])) {
return;
}
if (isset($this->waitingForWrite[(int) $stream])) {
$event = $this->events[(int) $stream];
if ($event instanceof \UVPoll) {
\uv_poll_stop($event);
\uv_close($event);
}
unset($this->events[(int) $stream]);
return;
}
$this->pollEvent($stream);
}
protected function pollEvent($stream)
{
if (!isset($this->events[(int) $stream])) {
return;
}
$flags = 0;
if (isset($this->waitingForRead[(int) $stream])) {
$flags |= \UV::READABLE;
}
if (isset($this->waitingForWrite[(int) $stream])) {
$flags |= \UV::WRITABLE;
}
\uv_poll_start($this->events[(int) $stream], $flags, $this->onEvent);
}
public function isFsEmpty(): bool
{
return ($this->uvFileSystem == 0);
}
public function fsAdd(): void
{
$this->uvFileSystem++;
}
public function fsRemove(): void
{
$this->uvFileSystem--;
}
public function isIoEmpty(): bool
{
return ($this->uvNetwork == 0);
}
public function ioAdd(): void
{
$this->uvNetwork++;
}
public function ioRemove(): void
{
$this->uvNetwork--;
}
public function setup(bool $useUvLoop = true): CoroutineInterface
{
if ($this->uv instanceof \UVLoop) {
@\uv_stop($this->uv);
@\uv_loop_delete($this->uv);
}
$this->uv = ($useUvLoop && \IS_UV) ? \uv_loop_new() : null;
\spawn_setup($this->uv, true, true, $useUvLoop);
\uv_native($useUvLoop);
return $this;
}
public function getUV(): ?\UVLoop
{
if ($this->uv instanceof \UVLoop)
return $this->uv;
// @codeCoverageIgnoreStart
if (Co::uvNative() && !\IS_UV)
throw new \RuntimeException('Calling method when "libuv" driver not loaded!');
return null;
// @codeCoverageIgnoreEnd
}
public function getParallel(): ParallelInterface
{
return $this->parallel;
}
public function getThread(): Thread
{
return $this->thread;
}
public function addFuture($callable, int $timeout = 0, bool $display = false, $channel = null): FutureInterface
{
$future = $this->parallel->add($callable, $timeout, $channel);
return $display ? $future->displayOn() : $future;
}
public function addThread(int $tid, callable $callable, ...$args): TWorker
{
return $this->thread->create($tid, $callable, ...$args);
}
public function isUv(): bool
{
return (\IS_UV && $this->uv instanceof \UVLoop);
}
public function isPcntl(): bool
{
return \extension_loaded('pcntl')
&& \function_exists('pcntl_async_signals')
&& \function_exists('posix_kill');
}
public function createTask(\Generator $coroutine, bool $isAsync = false): int
{
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
if ($isAsync)
$task->taskType('async');
$this->taskMap[$tid] = $task;
$this->schedule($task);
if (Co::getUnique('parent') === null && \count($this->taskMap) === 1)
Co::setUnique('parent', $tid);
return $tid;
}
public function schedule(TaskInterface $task)
{
$this->taskQueue->enqueue($task);
}
public function addFiber(FiberInterface $fiber)
{
$tid = ++$this->maxTaskId;
$this->taskMap[$tid] = $fiber;
if (Co::getUnique('parent') === null && \count($this->taskMap) === 1)
Co::setUnique('parent', $tid);
elseif (Co::getUnique('supervisor') === null && \count($this->taskMap) === 2)
Co::setUnique('supervisor', $tid);
return $tid;
}
public function scheduleFiber(FiberInterface $fiber)
{
$this->taskQueue->enqueue($fiber);
}
public function isFiber($fiber)
{
return $fiber instanceof FiberInterface;
}
/**
* A `stream/socket/fd` or `event` is free, ready or has data.
* Retrieve `Task`, remove and update scheduler for it's execution.
*
* @param string $type `read` or `write`
* @param mixed $stream
*/
protected function updateScheduler(string $type, $stream, bool $removeEvent = false)
{
if ($type == 'read') {
list(, $tasks) = $this->waitingForRead[(int) $stream];
$this->removeReader($stream, $removeEvent);
foreach ($tasks as $task) {
$this->executeTask($task, $stream);
}
} elseif ($type == 'write') {
list(, $tasks) = $this->waitingForWrite[(int) $stream];
$this->removeWriter($stream, $removeEvent);
foreach ($tasks as $task) {
$this->executeTask($task, $stream);
}
}
}
public function executeTask($task, $parameters = null)
{
if ($task instanceof TaskInterface) {
$this->schedule($task);
} elseif ($task instanceof FiberInterface) {
$this->scheduleFiber($task);
} elseif ($task($parameters) instanceof \Generator) {
$this->createTask($task($parameters));
}
}
public function shutdown(?int $skipTask = 1)
{
if ($skipTask === 1)
$skipTask = Co::getUnique('parent');
if (!empty($this->future))
$this->future->stopAll();
if (!empty($this->taskMap)) {
$map = \array_reverse($this->taskMap, true);
$keys = \array_keys($map);
foreach ($keys as $id) {
if ($id !== $skipTask && $id > 0) {
$this->cancelTask((int) $id);
}
}
}
if (!empty($this->completedMap)) {
foreach ($this->completedMap as $task) {
$task->close();
if (!$task instanceof FiberInterface)
$task->customState('shutdown');
}
}
$this->close();
}
public function cancelledList(): ?array
{
return $this->cancelledMap;
}
public function cancelTask(int $tid, $customState = null, string $errorMessage = 'Invalid task ID!')
{
if (!isset($this->taskMap[$tid])) {
return false;
}
unset($this->taskMap[$tid]);
foreach ($this->taskQueue as $i => $task) {
if ($this->isFiber($task) && ($task->fiberId() === $tid)) {
$task->close();
$task->setState('cancelled');
unset($this->taskQueue[$i]);
$this->cancelledMap[$tid] = true;
break;
} elseif ($task->taskId() === $tid) {
if ($task->getCustomData() instanceof \UVFsEvent)
$this->fsRemove();
$task->close();
if (!empty($customState))
$task->customState($customState);
$task->setState('cancelled');
unset($this->taskQueue[$i]);
$this->cancelledMap[$tid] = true;
break;
}
}
return true;
}
public function cancelProgress(TaskInterface $task)
{
$channel = $task->getCustomState();
if (\is_array($channel) && (\count($channel) == 2)) {
[$channel, $channelTask] = $channel;
if ($channel instanceof ChanneledInterface && \is_int($channelTask) && isset($this->taskMap[$channelTask])) {
unset($this->taskMap[$channelTask]);
foreach ($this->taskQueue as $i => $task) {
if (!$this->isFiber($task) && ($task->taskId() === $channelTask)) {
$task->close();
$task->setState('cancelled');
unset($this->taskQueue[$i]);
$this->cancelledMap[$channelTask] = true;
break;
}
}
}
}
}
public function currentList(): ?array
{
if (!isset($this->taskMap)) {
return null;
}
return $this->taskMap;
}
public function getTask(?int $taskId = 0)
{
return isset($this->taskMap[$taskId]) ? $this->taskMap[$taskId] : null;
}
public function completedList(): ?array
{
if (!isset($this->completedMap)) {
return null;
}
return $this->completedMap;
}
public function isCompleted(int $tid): bool
{
return isset($this->completedMap[$tid]);
}
public function getCompleted(int $tid)
{
if (isset($this->completedMap[$tid]))
return $this->completedMap[$tid];
}
public function updateCompleted(
int $taskId,
array $completeList = [],
?callable $onClear = null,
bool $cancel = false,
bool $forceUpdate = false
): void {
if (isset($completeList[$taskId]) && \is_callable($onClear)) {
$onClear($completeList[$taskId]);
}
if ($cancel) {
$this->cancelTask($taskId);
} else {
if (empty($completeList) || $forceUpdate) {
$completeList = $this->completedList();
}
if (isset($completeList[$taskId])) {
unset($completeList[$taskId]);
}
$this->completedMap = $completeList;
}
}
public function isGroup(int $tid): bool
{
return isset($this->taskGroupMap[$tid]);
}
public function getGroup(): ?array
{
return $this->taskGroupMap;
}
public function getGroupResult(int $tid)
{
$result = $this->taskGroupMap[$tid];
unset($this->taskGroupMap[$tid]);
return $result;
}
public function setGroupResult(int $tid, $value): void
{
$this->taskGroupMap[$tid] = $value;
}
public function ioStop()
{
$this->ioStarted = false;
}
public function futureOn(): void
{
$this->isFutureActive = 'future';
}
public function futureOff(): void
{
$this->isFutureActive = false;
}
public function run()
{
// Check/skip if main supervisor task already running
if (!$this->ioStarted) {
$this->ioStarted = true;
Co::setUnique('supervisor', $this->createTask($this->ioWaiting()));
}
return $this->execute($this->isFutureActive);
}
/**
* Run all `fibers` in the queue.
*
* @param Fiber $fiber
* @return void
*
* @internal
*/
protected function executeFiber(Fiber $fiber)
{
// Skip and reschedule, if `fiber` in suspend state
if ($fiber->isSuspended())
return $this->scheduleFiber($fiber);
$fiber->setState('running');
$fiber->cyclesAdd();
try {
$value = $fiber->run();
} catch (\Throwable $error) {
$returning = $fiber->getCaller();
$returning->setState('erred');
$returning->setException($error);
$this->isFiber($returning)
? $this->scheduleFiber($returning)
: $this->schedule($returning);
return;
}
if ($value instanceof Kernel) {
try {
$value($fiber, $this);
} catch (\Throwable $error) {
$fiber->setState('erred');
$fiber->setException($error);
$this->scheduleFiber($fiber);
}
return;
}
if ($fiber->isFinished()) {
$fiber->setState('completed');
$id = $fiber->fiberId();
$returning = $fiber->getCaller();
$this->isFiber($returning)
? $this->scheduleFiber($returning)
: $this->schedule($returning);
unset($this->taskMap[$id]);
} else {
if (!$value instanceof Kernel && !empty($value)) {
$fiber->setReturn($value);
}
$fiber->setState('rescheduled');
$this->scheduleFiber($fiber);
}
return;
}
public function execute($isReturn = false)
{
while (!$this->taskQueue->isEmpty()) {
/** @var TaskInterface|FiberInterface */
$task = $this->taskQueue->dequeue();
if ($task instanceof FiberInterface) {
$this->executeFiber($task);
continue;
} elseif ($task instanceof TaskInterface) {
$task->setState('running');
$task->cyclesAdd();
$value = $task->run();
if ($value instanceof Kernel) {
try {
$value($task, $this);
} catch (\Throwable $error) {
$task->setState(
($error instanceof CancelledError ? 'cancelled' : 'erred')
);
$task->setException($error);
$this->schedule($task);
}
continue;
}
if ($task->isFinished()) {
$this->cancelProgress($task);
$id = $task->taskId();
if ($task->isStateless()) {
$task->close();
} else {
$state = $task->getState();
$task->setState('completed');
$isTaskGroup = $task->hasGroup();
if ($isTaskGroup)
$task->doneGroup();
if ($task->hasCaller()) {
$unjoined = $task->getCaller();
$task->setCaller();
$final = $task->exception();
$result = $task->result();
$this->taskGroupMap[$id] = ($state === 'erred') ? $final : $result;
$unjoined->sendValue($this->taskGroupMap[$id]);
$this->schedule($unjoined);
} elseif (!$isTaskGroup && $id !== null) {
if ($task->exception() instanceof CancelledError)
$this->cancelTask($id);
else
$this->completedMap[$id] = $task;
}
}
unset($this->taskMap[$id]);
} else {
$task->setState('rescheduled');
$this->schedule($task);
}
if ($isReturn) {
if ($isReturn === 'signaling') {
return $this->ioWaiting();
} elseif ($isReturn === 'channeling') {
$this->ioWaiting();
if (!$this->future->isEmpty())
continue;
} elseif ($isReturn === 'future') {
$this->ioWaiting();
}
return;
}
}
}
}
/**
* Runs all pending timers.
*
* @return int|false
*/
protected function runTimers()
{
if ($this->isUv()) {
return (\count($this->timers) > 0) ? 1 : false;
}
$now = $this->timestamp();
while (($timer = \array_pop($this->timers)) && $timer[0] < $now) {
$this->executeTask($timer[1]);
}
// Add the last timer back to the array.
if ($timer) {
$this->timers[] = $timer;
return \max(0, $timer[0] - $this->timestamp());
}
}
/**
* Check and return `true` for **no** *pending I/O*, *events*, *signals*, *subprocess futures*,
* *streams/sockets/fd* activity, *timers*, *tasks*, or *threads*.
*/
protected function isCoroutinesDone(): bool
{
return $this->taskQueue->isEmpty()
&& empty($this->waitingForRead)
&& empty($this->waitingForWrite)
&& empty($this->timers)
&& $this->future->isEmpty()
&& (isset($this->thread) ? $this->thread->isEmpty() : true)
&& !$this->isSignaling()
&& $this->isIoEmpty()
&& $this->isFsEmpty();
}
protected function waitTime($previousTime)
{
$streamWait = null;
if (\is_numeric($previousTime))
// Wait until the next Timeout should trigger.
$streamWait = $previousTime * 1000000;
elseif (!$this->taskQueue->isEmpty())
// There's a pending 'createTask'. Don't wait.
$streamWait = 0;
elseif (!$this->future->isEmpty())
// There's a running 'future', wait some before rechecking.
$streamWait = $this->future->sleepingTime();
return $streamWait;
}
/**
* Check for `Coroutines`, will exit if nothing is pending.
* This is the main `i/o events` supervisor, the `task` driver for `libuv` or `stream_select`.
*/
protected function ioWaiting()
{
$isUv = $this->isUv();
while (true) {
if ($this->isCoroutinesDone()) {
$this->ioStop();
break;
} else {
$this->future->processing();
$nextTimeout = $this->runTimers();
$streamWait = $this->waitTime($nextTimeout);
$overrideTimeout = $this->isFutureActive ? 0 : $streamWait;
if ($isUv)
\uv_run(
$this->uv,
($streamWait || $this->channelCounter || $this->isFutureActive !== false ? \UV::RUN_ONCE : \UV::RUN_NOWAIT)
);
$this->ioSocketStream($isUv ? 0 : $overrideTimeout);
yield;
}
}
}
/**
* Wait for activity, or until the next timer is due.
*
* @param integer|null $timeout microseconds, or null to wait forever.
*/
protected function ioSocketStream($timeout)
{
if (empty($this->waitingForRead) && empty($this->waitingForWrite)) {
return;
}
$rSocks = [];
foreach ($this->waitingForRead as list($socket)) {
$rSocks[] = $socket;
}
$wSocks = [];
foreach ($this->waitingForWrite as list($socket)) {
$wSocks[] = $socket;
}
$eSocks = []; // dummy
if (!@\stream_select(
$rSocks,
$wSocks,
$eSocks,
(null === $timeout) ? null : 0,
$timeout ? (int) ($timeout * (($timeout === null) ? 1000000 : 1)) : 0
)) {
return;
}
foreach ($rSocks as $socket) {
$this->updateScheduler('read', $socket);
}
foreach ($wSocks as $socket) {
$this->updateScheduler('write', $socket);
}
}
public function addReader($stream, $task, bool $addEvent = true): CoroutineInterface
{
$already = true;
if (isset($this->waitingForRead[(int) $stream])) {
$already = false;
$this->waitingForRead[(int) $stream][1][] = $task;
} else {
$this->waitingForRead[(int) $stream] = [$stream, [$task]];
}
if ($this->isUv() && $already && $addEvent)
$this->addEvent($stream);
return $this;
}
public function addWriter($stream, $task, bool $addEvent = true): CoroutineInterface
{
$already = true;
if (isset($this->waitingForWrite[(int) $stream])) {
$already = false;
$this->waitingForWrite[(int) $stream][1][] = $task;
} else {
$this->waitingForWrite[(int) $stream] = [$stream, [$task]];
}
if ($this->isUv() && $already && $addEvent)
$this->addEvent($stream);
return $this;
}
public function removeReader($stream, bool $removeEvent = true): CoroutineInterface
{
if ($this->isUv() && $removeEvent) {
$this->removeReadEvent($stream);
}
unset($this->waitingForRead[(int) $stream]);
return $this;
}
public function removeWriter($stream, bool $removeEvent = true): CoroutineInterface
{
if ($this->isUv() && $removeEvent) {
$this->removeWriteEvent($stream);
}
unset($this->waitingForWrite[(int) $stream]);
return $this;
}
public function addSignal($signal, $listener)
{
if (!$this->signaler)
return;
$first = $this->signaler->count($signal) === 0;
$this->signaler->add($signal, $listener);
if ($first && $this->isPcntl()) {
\pcntl_signal($signal, array($this->signaler, 'execute'));
} elseif ($this->isUv() || $this->isUvSignal) {
if (!isset($this->signals[$signal])) {
$signals = $this->signaler;
$this->signals[$signal] = \uv_signal_init($this->uv);
\uv_signal_start($this->signals[$signal], function ($signal, $signalInt) use ($signals) {
$signals->execute($signalInt);
}, $signal);
}
}
}
public function removeSignal($signal, $listener)
{
if (!$this->signaler || !$this->signaler->count($signal))
return;
$this->signaler->remove($signal, $listener);
if ($this->signaler->count($signal) === 0 && $this->isPcntl()) {
\pcntl_signal($signal, \SIG_DFL);
} elseif ($this->isUv() || $this->isUvSignal) {
if (isset($this->signals[$signal]) && $this->signaler->count($signal) === 0) {
if (\uv_is_active($this->signals[$signal]))
@\uv_signal_stop($this->signals[$signal]);
unset($this->signals[$signal]);
}
}
}
/**
* Setup Signal listener.
*/
public function initSignals()
{
$this->isUvSignal = \IS_UV;
if (empty($this->signaler) && ($this->isPcntl() || $this->isUv() || $this->isUvSignal)) {
$this->signaler = new Signaler($this);
if ($this->isPcntl()) {
$this->isUvSignal = false;
\pcntl_async_signals(true);
}
}
}
public function isSignaling()
{
if (!$this->signaler)
return;
return !$this->signaler->isEmpty();
}
public function getSignaler()
{
return $this->signaler;
}
protected function addTimer($interval, $callback)
{
$timer = \uv_timer_init($this->uv);
$this->timers[(int) $timer] = [$interval, $callback];
\uv_timer_start(
$timer,
$interval,
0,
$this->onTimer
);
return $timer;
}
public function clearTimeout(TaskInterface $task): void
{
$timer = $task->getTimer();
if ($this->isUv() && $timer instanceof \UVTimer && \uv_is_active($timer)) {
@\uv_timer_stop($timer);
\uv_unref($timer);
unset($this->timers[(int) $timer]);
$task->setTimer();
} elseif (\is_float($timer)) {
foreach ($this->timers as $index => $timers) {
if ($timers[0] === $timer) {
unset($this->timers[$index]);
$task->setTimer();
break;
}
}
}
}
public function addTimeout($task = null, float $timeout = 0.0, int $tid = null)
{
if ($this->isUv()) {
$interval = (int) \round($timeout * 1000);
$timer = \uv_timer_init($this->uv);
$this->timers[(int) $timer] = [$interval, $task];
\uv_timer_start(
$timer,
$interval,
0,
$this->onTimer
);
if (\is_integer($tid))
$this->getTask($tid)->setTimer($timer);
return;
}
$triggerTime = $this->timestamp() + ($timeout);
if (\is_integer($tid))
$this->getTask($tid)->setTimer($triggerTime);
if (!$this->timers) {
// Special case when the timers array was empty.
$this->timers[] = [$triggerTime, $task];
return;
}
// We need to insert these values in the timers array, but the timers
// array must be in reverse-order of trigger times.
//
// So here we search the array for the insertion point.
$index = \count($this->timers) - 1;
while (true) {
if ($triggerTime < $this->timers[$index][0]) {
\array_splice(
$this->timers,
$index + 1,
0,
[[$triggerTime, $task]]
);
break;
} elseif (0 === $index) {
\array_unshift($this->timers, [$triggerTime, $task]);
break;
}
--$index;
}
}
public static function value($value)
{
return new ReturnValue($value);
}
public static function plain($value)
{
return new PlainValue($value);
}
/**
* Wait on keyboard input.
* Will not block other task on `Linux`, will continue other tasks until `enter` key is pressed,
* Will block on Windows, once an key is typed/pressed, will continue other tasks `ONLY` if no key is pressed.
* - This function needs to be prefixed with `yield`
*
* @return string
*/
public static function input(int $size = 256, bool $error = false)
{
//Check on STDIN stream
$blocking = \stream_set_blocking(\STDIN, false);
if ($error && !$blocking) {
throw new InvalidArgumentException('Non-blocking STDIN, could not be enabled.');
}
// @codeCoverageIgnoreStart
yield Kernel::readWait(\STDIN);
if (\IS_WINDOWS) {
$windows7 = \strpos(\php_uname('v'), 'Windows 7') !== false;
// kinda of workaround to allow non blocking under Windows 10, if no key is typed, will block after key press
if (!$blocking) {
while (true) {
$tell = \ftell(\STDIN);
if (\is_int($tell) || $windows7)
break;
else
yield;
}
}
}
return \stream_get_line(\STDIN, $size, \EOL);
// @codeCoverageIgnoreEnd
}
public static function create(\Generator $gen)
{
$stack = new \SplStack;
$exception = null;
for (;;) {
try {
if ($exception instanceof \Throwable) {
$gen->throw($exception);
$exception = null;
continue;
}
$value = $gen->current();
if ($value instanceof \Generator) {
$stack->push($gen);
$gen = $value;
continue;
}
$isReturnValue = $value instanceof ReturnValue;
if (!$gen->valid() || $isReturnValue) {
if ($stack->isEmpty()) {
return;
}
$return = null;
if (!$gen->valid() && !$isReturnValue) {
$return = $gen->getReturn();
}
$gen = $stack->pop();
$gen->send($isReturnValue ? $value->getValue() : $return);
continue;
}
if ($value instanceof PlainValue) {
$value = $value->getValue();
}
try {
$sendValue = (yield $gen->key() => $value);
} catch (\Throwable $e) {
$gen->throw($e);
continue;
}
$gen->send($sendValue);
} catch (\Throwable $e) {
if ($stack->isEmpty()) {
throw $e;
}
$gen = $stack->pop();
$exception = $e;
}
}
}
}