src/Tasks/TaskQueue.php
<?php
declare(strict_types=1);
namespace Smuuf\Primi\Tasks;
use \Smuuf\Primi\Context;
use \Smuuf\Primi\Helpers\Func;
use \Smuuf\StrictObject;
class TaskQueue {
use StrictObject;
private Context $context;
/** Run queued tasks after this time interval passes (in seconds). */
public static float $interval = 0.25;
/** For measuring time. */
private float $timer;
/**
* FIFO queue for scheduling tasks.
*
* @var array<array{TaskInterface, float}>
*/
private array $queue = [];
/**
* Random ID for this queue instance (safer than spl_object_id() or similar
* for checking uniqueness).
*/
private string $id;
public function __construct(Context $ctx) {
$this->id = Func::unique_id();
$this->context = $ctx;
$this->timer = Func::monotime();
}
public function getId(): string {
return $this->id;
}
public function addTask(TaskInterface $task, float $delay = 0): void {
$eta = Func::monotime() + $delay;
$this->queue[] = [$task, $eta];
}
public function tick(): void {
if (!$this->queue) {
return;
}
if ((Func::monotime() - $this->timer) < self::$interval) {
return;
}
$this->timer = Func::monotime();
$this->executeQueued();
}
/**
* Run all remaining tasks.
*
* Tasks with ETA in the future are skipped and kept in the queue..
*/
public function deplete(): void {
$this->executeQueued(\true);
}
/**
* Execute queued tasks. Tasks with ETA in the future will be skipped
* and placed into the queue again, unless `$force` parameter is `true`, in
* which case all tasks, regardless on their ETA, will be executed.
*/
private function executeQueued(bool $force = \false): void {
// Because asynchronous events (e.g. signals) could modify (add tasks to)
// the $queue property while we're iterating through it, and
// the same (adding more tasks) could be done by any tasks we're now
// actually going to execute, we need to handle these edge-case
// situations.
// Create a copy of the queue and empty the main queue, so that any
// new entries are added to the empty queue.
[$queue, $this->queue] = [$this->queue, []];
$skipped = [];
foreach ($queue as [$task, $eta]) {
if ($eta > Func::monotime() && !$force) {
$skipped[] = [$task, $eta];
continue;
}
$this->executeTask($task);
}
// If there were any tasks skipped, add it to the end of the "cleared"
// queue (but which may now already have new tasks in it. See above.)
if ($skipped) {
$this->queue = \array_merge($this->queue, $skipped);
}
}
private function executeTask(TaskInterface $task): void {
$task->execute($this->context);
}
}