bkdotcom/PHPDebugConsole

View on GitHub
src/CurlHttpMessage/Handler/CurlMulti.php

Summary

Maintainability
A
1 hr
Test Coverage
A
96%
<?php

namespace bdk\CurlHttpMessage\Handler;

use bdk\CurlHttpMessage\CurlReqRes;
use bdk\Promise;
use CurlHandle;
use CurlMultiHandle;
use RuntimeException;

/**
 * Take a RequestInterface instance, return a Promise
 */
class CurlMulti extends Curl
{
    /** @var int Will be higher than 0 when `curl_multi_exec` is still running. */
    private $active = 0;

    /** @var resource[]|\CurlHandle[] Idle curl handles */
    private $idleHandles = array();

    /** @var CurlMultiHandle|resource|null|false */
    private $multiHandle;

    /** @var array options */
    protected $options = array();

    /** @var CurlReqRes[] Requests currenting being executed by curl indexed by (int) curlHandle */
    private $processing = array();

    /** @var CurlReqRes[] Request Queue indexed by CurlReqRes hash*/
    private $queue = array();

    /**
     * Constructor
     *
     * @param array $options Options including
     *                          curl :  http://php.net/curl_setopt
     *                          curlMulti : http://php.net/curl_multi_setopt
     *                          selectTimeout : 1
     *
     * @throws RuntimeException
     */
    public function __construct($options = array())
    {
        $this->options = \array_replace_recursive(array(
            'curlMulti' => array(),
            'maxConcurrent' => 10,
            'maxIdleHandles' => 10,
            'selectTimeout' => 1,
        ), $options);
        $this->init();
    }

    /**
     * Destructor
     */
    public function __destruct()
    {
        if (isset($this->multiHandle)) {
            \curl_multi_close($this->multiHandle);
            unset($this->multiHandle);
        }
    }

    /**
     * Invoke handler
     *
     * @param CurlReqRes $curlReqRes CurlReqRes instance
     *
     * @return Promise
     */
    public function __invoke(CurlReqRes $curlReqRes)
    {
        $hash = \spl_object_hash($curlReqRes);
        $this->queue[$hash] = $curlReqRes;
        $promise = new Promise(
            array($this, 'process'),
            function () use ($curlReqRes) {
                $this->cancel($curlReqRes);
            }
        );
        $curlReqRes->setPromise($promise);
        return $promise;
    }

    /**
     * Process the request queue
     *
     * @return void
     */
    public function process()
    {
        if ($this->active) {
            return;
        }
        while ($this->processing || $this->queue) {
            $this->tick();
        }
    }

    /**
     * Cancels a handle from sending and removes references to it.
     *
     * @param CurlReqRes $curlReqRes cURL / request / response
     *
     * @return void
     */
    private function cancel(CurlReqRes $curlReqRes)
    {
        $curlHandle = $curlReqRes->getCurlHandle();
        if ($curlHandle) {
            $id = (int) $curlHandle;
            unset($this->processing[$id]);
            \curl_multi_remove_handle($this->multiHandle, $curlHandle);
            $this->releaseHandle($curlReqRes);
        }
        $hash = \spl_object_hash($curlReqRes);
        if (isset($this->queue[$hash])) {
            unset($this->queue[$hash]);
        }
    }

    /**
     * Reuse existing curl handle or init a new one
     *
     * @return resource|CurlHandle
     */
    private function getCurlHandle()
    {
        return $this->idleHandles
            ? \array_pop($this->idleHandles)
            : \curl_init();
    }

    /**
     * Return delay (in milli seconds)
     *
     * @return int
     */
    private function getDelay()
    {
        $currentTime = \microtime(true);
        $nextTime = \PHP_INT_MAX;
        foreach ($this->queue as $curlReqRes) {
            $time = $curlReqRes->getOption('noEarlierThan');
            if ($time !== null && $time < $nextTime) {
                $nextTime = $time;
            }
        }
        return $nextTime !== PHP_INT_MAX
            ? \ceil(\max(0, $nextTime - $currentTime) * 1000)
            : 0;
    }

    /**
     * Initialize cURL multi handler
     *
     * @return void
     *
     * @throws \RuntimeException
     */
    protected function init()
    {
        $this->multiHandle = \curl_multi_init();
        if ($this->multiHandle === false) {
            throw new RuntimeException('Can not initialize curl multi handle.');
        }
        foreach ($this->options['curlMulti'] as $option => $value) {
            // A warning is raised in case of a wrong option.
            // phpcs:ignore SlevomatCodingStandard.Namespaces.FullyQualifiedGlobalFunctions.NonFullyQualified
            curl_multi_setopt($this->multiHandle, $option, $value);
        }
    }

    /**
     * Take CurlReqRes from the queue and fill the processing bucket
     *
     * @return void
     */
    protected function manageQueue()
    {
        $currentTime = \microtime(true);
        foreach ($this->queue as $hash => $curlReqRes) {
            if (\count($this->processing) >= $this->options['maxConcurrent']) {
                break;
            }
            if ($curlReqRes->getOption('noEarlierThan') > $currentTime) {
                continue;
            }
            unset($this->queue[$hash]);
            $curlHandle = $this->getCurlHandle();
            $curlReqRes->setCurlHandle($curlHandle);
            \curl_multi_add_handle($this->multiHandle, $curlHandle);
            $id = (int) $curlHandle;
            $this->processing[$id] = $curlReqRes;
        }
    }

    /**
     * Release and recycle curl handle
     *
     * @param CurlReqRes $curlReqRes Curl Request/Response
     *
     * @return void
     */
    private function releaseHandle(CurlReqRes $curlReqRes)
    {
        $curlHandle = $curlReqRes->getCurlHandle();
        $curlReqRes->setCurlHandle(null);
        if (\count($this->idleHandles) >= $this->options['maxIdleHandles']) {
            \curl_close($curlHandle);
            return;
        }
        if (\function_exists('curl_reset')) {
            \curl_reset($curlHandle);
            $this->idleHandles[] = $curlHandle;
        }
    }

    /**
     * Ticks the curl event loop.
     *
     * @return void
     */
    protected function tick()
    {
        $this->manageQueue();

        // If there are no transfers, then sleep for the next delay
        $delay = !$this->active
            ? $this->getDelay()
            : 0;
        if ($delay) {
            $microsec = $delay * 1000;
            \usleep((int) $microsec);
        }

        // Step through the task queue which may add additional requests.
        Promise::queue()->run();

        if ($this->active && \curl_multi_select($this->multiHandle, $this->options['selectTimeout']) === -1) {
            // Perform a usleep if a select returns -1.
            // See: https://bugs.php.net/bug.php?id=61141
            \usleep(250);
        }

        while (\curl_multi_exec($this->multiHandle, $this->active) === \CURLM_CALL_MULTI_PERFORM);

        $this->curlMultiInfoRead();
    }

    /**
     * Get results from requests curl in processing
     *
     * @return void
     */
    private function curlMultiInfoRead()
    {
        while ($done = \curl_multi_info_read($this->multiHandle)) {
            if ($done['msg'] !== CURLMSG_DONE) {
                // if it's not done, then it would be premature to remove the handle.
                continue;
            }
            $id = (int) $done['handle'];
            \curl_multi_remove_handle($this->multiHandle, $done['handle']);

            if (!isset($this->processing[$id])) {
                // Probably was cancelled.
                continue;
            }

            $curlReqRes = $this->processing[$id];
            unset($this->processing[$id]);
            $response = $curlReqRes->finish();
            $this->releaseHandle($curlReqRes);
            $curlReqRes->getPromise()->resolve(
                // Promise::promiseFor()
                $response
            );
        }
    }
}