Coroutine/Misc/Queue.php
<?php
declare(strict_types=1);
namespace Async\Misc;
use Async\QueueFull;
use Async\QueueEmpty;
use Async\LengthException;
/**
* A `queue`, useful for coordinating _producer_ and _consumer_ coroutines.
* This class is a asynchronous wrapper around standard PHP **SplQueue** library.
*
* @see https://curio.readthedocs.io/en/latest/reference.html#queues
* @see https://docs.python.org/3.10/library/asyncio-queue.html#asyncio-queues
* @source https://github.com/python/cpython/blob/3.10/Lib/asyncio/queues.py
*/
final class Queue
{
protected $max_size = 0;
protected $queue = null;
protected $finished = false;
protected $unfinished_tasks = 0;
public function __destruct()
{
unset($this->queue);
$this->queue = null;
$this->max_size = null;
$this->finished = null;
$this->unfinished_tasks = null;
}
protected function _get()
{
return $this->queue->dequeue();
}
protected function _put($item)
{
$this->queue->enqueue($item);
}
/**
* If maxsize is less than or equal to zero, the queue size is infinite. If it
* is an integer greater than `0`, then `put()` will block when the
* queue reaches maxsize, until an item is removed by `get()`.
*
* @param integer $maxsize
*/
public function __construct(int $maxsize = 0)
{
$this->max_size = $maxsize;
$this->finished = true;
$this->queue = new \SplQueue();
}
/**
* Number of items in the queue.
*
* @return integer
*/
public function size(): int
{
return $this->queue->count();
}
/**
* Number of items allowed in the queue.
*
* @return integer
*/
public function maxsize(): int
{
return $this->max_size;
}
/**
* Return `True` if the queue is empty, `False` otherwise.
*
* @return bool
*/
public function empty(): bool
{
return $this->queue->isEmpty();
}
/**
* Return `True` if there are maxsize items in the queue.
*
* Note: if the Queue was initialized with maxsize=0 (the default),
* then `full()` is never `True`.
*
* @return bool
*/
public function full(): bool
{
if ($this->max_size <= 0)
return false;
return $this->size() >= $this->max_size;
}
/**
* Put an `item` into the queue.
*
* Put an item into the queue. If the queue is full, `wait` until a free
* slot is available before adding item.
*
* - This function needs to be prefixed with `yield`
*
* @param mixed $item
* @return void
*/
public function put($item)
{
try {
while ($this->full())
yield;
} catch (\Throwable $e) {
try {
yield \kill_task();
} catch (\Throwable $other) {
}
throw $e;
}
return $this->put_nowait($item);
}
/**
* Put an `item` into the queue without blocking.
*
* @param mixed $item
* @return void
* @throws QueueFull if no free slot is immediately available
*/
public function put_nowait($item): void
{
if ($this->full())
throw new QueueFull("No free slot available!");
$this->_put($item);
$this->unfinished_tasks++;
$this->finished = false;
}
/**
* Remove and return an `item` from the queue.
*
* If queue is empty, `wait` until an item is available.
*
* - This function needs to be prefixed with `yield`
*
* @return mixed
*/
public function get()
{
try {
while ($this->empty())
yield;
} catch (\Throwable $e) {
try {
yield \kill_task();
} catch (\Throwable $other) {
}
throw $e;
}
return $this->get_nowait();
}
/**
* Remove and return an item from the queue.
*
* @return mixed
* @throws QueueEmpty if no item is immediately available
*/
public function get_nowait()
{
if ($this->empty())
throw new QueueEmpty('No item available!');
$item = $this->_get();
return $item;
}
/**
* Indicate that a formerly `enqueued` task is complete.
*
* Used by queue _consumers_. For each `get()` used to fetch a task,
* a subsequent call to `task_done()` tells the queue that the processing
* on the task is complete.
*
* If a `join()` is currently blocking, it will resume when all items have
* been processed (meaning that a `task_done()` call was received for every
* item that had been `put()` into the queue).
*
* @throws LengthException if called more times than there were items placed in
* the queue.
* @return void
*/
public function task_done(): void
{
if ($this->unfinished_tasks <= 0)
throw new LengthException('task_done() called too many times!');
$this->unfinished_tasks--;
if ($this->unfinished_tasks == 0)
$this->finished = true;
}
/**
* Block until all items in the queue have been gotten and processed.
*
* The count of unfinished tasks goes up whenever an item is added to the
* queue. The count goes down whenever a consumer calls `task_done()` to
* indicate that the item was retrieved and all work on it is complete.
* When the count of unfinished tasks drops to zero, `join()` unblocks.
*
* - This function needs to be prefixed with `yield`
*
* @return void
*/
public function join()
{
$yielding = $this->unfinished_tasks;
if ($this->unfinished_tasks > 0) {
while (!$this->finished) {
foreach (\range(0, $yielding) as $nan)
yield;
}
}
}
}