k911/swoole-bundle

View on GitHub
src/Coroutine/CoroutinePool.php

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
<?php

declare(strict_types=1);

namespace K911\Swoole\Coroutine;

use Assert\Assertion;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine\Scheduler;
use Throwable;

/**
 * @internal
 */
final class CoroutinePool
{
    private $coroutines;
    private $coroutinesCount;
    private $resultsChannel;
    private $results;
    private $exceptions;
    private $started;

    public function __construct(Channel $resultsChannel, callable ...$coroutines)
    {
        $this->coroutines = $coroutines;
        $this->coroutinesCount = \count($coroutines);
        $this->resultsChannel = $resultsChannel;
        $this->results = [];
        $this->exceptions = [];
        $this->started = false;
    }

    public static function fromCoroutines(callable ...$coroutines): self
    {
        return new self(new Channel(1), ...$coroutines);
    }

    /**
     * Blocks until all coroutines have been finished.
     */
    public function run(): array
    {
        $this->start();

        // TODO: Create parent exception containing all child exceptions and throw it instead
        if (\count($this->exceptions) > 0) {
            throw $this->exceptions[0];
        }

        return $this->results;
    }

    private function start(): void
    {
        Assertion::false($this->started, 'Single PoolExecutor cannot be run twice.');
        $this->started = true;

        if (self::inCoroutine()) {
            $this->startWaitGroup();

            return;
        }

        $this->startScheduler();
    }

    private function startWaitGroup(): void
    {
        foreach ($this->coroutines as $coroutine) {
            Coroutine::create($this->wrapCoroutine($this->resultsChannel, $coroutine));
        }

        Coroutine::create($this->makeGatherResults());
    }

    private function startScheduler(): void
    {
        $scheduler = new Scheduler();

        foreach ($this->coroutines as $coroutine) {
            $scheduler->add($this->wrapCoroutine($this->resultsChannel, $coroutine));
        }

        $scheduler->add($this->makeGatherResults());
        $scheduler->start();
    }

    private function makeGatherResults(): \Closure
    {
        return function (): void {
            while ($this->coroutinesCount > 0) {
                $result = $this->resultsChannel->pop(-1);
                $outputName = $result instanceof Throwable ? 'exceptions' : 'results';
                $this->{$outputName}[] = $result;
                --$this->coroutinesCount;
            }
        };
    }

    private function wrapCoroutine(Channel $resultChannel, callable $coroutine): \Closure
    {
        return static function () use ($resultChannel, $coroutine): void {
            $result = null;

            try {
                $result = $coroutine() ?? true;
            } catch (\Throwable $exception) {
                $result = $exception;
            }
            $resultChannel->push($result);
        };
    }

    private static function inCoroutine(): bool
    {
        return -1 !== Coroutine::getuid();
    }
}