symplely/hyper

View on GitHub
Request/Hyper.php

Summary

Maintainability
D
1 day
Test Coverage
<?php

declare(strict_types=1);

namespace Async\Request;

use Async\Coroutine\Kernel;
use Async\Coroutine\CoroutineInterface;
use Async\Coroutine\TaskInterface;
use Async\Request\Uri;
use Async\Request\Request;
use Async\Request\Response;
use Async\Request\AsyncStream;
use Async\Request\Body;
use Async\Request\BodyInterface;
use Async\Request\HyperInterface;
use Async\Request\Exception\ClientException;
use Async\Request\Exception\NetworkException;
use Async\Request\Exception\RequestException;
use Psr\Http\Message\UriInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\StreamInterface;
use Psr\Log\LoggerInterface;

/**
 * Class Hyper
 *
 * @package Async\Request\Hyper
 */
class Hyper implements HyperInterface
{
    /**
     * Set of key => value pairs to include as default headers with request calls.
     *
     * Headers are only added when using the `request` method
     * (or any of the built-in HTTP method calls: get, post, put, etc.).
     */
    const HEADERS = [
        'headers' => [
            'Accept' => '*/*',
            'Accept-Charset' => 'utf-8',
            'Accept-Language' => 'en-US,en;q=0.9',
            'X-Powered-By' => 'PHP/' . \PHP_VERSION,
            'Connection' => 'close'
        ]
    ];

    /**
     * Default options.
     */
    const OPTIONS = [
        'protocol_version' => '1.1',
        'follow_location' => 1,
        'request_fulluri' => false,
        'max_redirects' => 10,
        'ignore_errors' => true,
        'timeout' => 2,
        'user_agent' => \SYMPLELY_USER_AGENT,
    ];

    /**
     * @var AsyncStream|StreamInterface
     */
    protected $stream = null;

    /**
     * @var Request|RequestInterface
     */
    protected $request = null;

    /**
     * Value to be used with `stream_set_timeout()`
     *
     * @var float
     */
    protected $timeout = \RETRY_TIMEOUT;

    protected $httpId = null;

    /**
     * @var string;
     */
    protected $loggerName = '';

    /**
     * @var LoggerInterface;
     */
    protected $logger = null;

    /**
     * flag for gzip and deflate response content encoding support.
     *
     * @var bool
     */
    protected $hasZlib = false;

    public function __construct(?string $loggerName = null)
    {
        $this->loggerName = empty($loggerName) ? '-' : $loggerName;
        $this->logger = \hyper_logger($this->loggerName);
        if (empty($loggerName)) {
            \logger_array(0xff, 1, null, $this->loggerName);
        }
    }

    public function close()
    {
        if ($this->stream instanceof StreamInterface)
            $this->stream->close();

        $this->flush();
    }

    public function flush()
    {
        $this->request = null;
        $this->stream = null;
        $this->httpId = null;
        $this->hasZlib = false;
        $this->timeout = \RETRY_TIMEOUT;

        $this->logger = null;
        $this->loggerName = '';
    }

    public function withEncoding(): HyperInterface
    {
        if (\function_exists('inflate_init')) {
            $this->hasZlib = true;
        } else {
            $this->hasZlib = false;
        }

        return $this;
    }

    /**
     * Return the Logger instance and Logger name used
     *
     * @return array<LoggerInterface, string>
     */
    public function logger(): array
    {
        return [$this->logger, $this->loggerName];
    }

    /**
     * @inheritdoc
     */
    public static function await(
        array $requests,
        int $count = 0,
        bool $exception = true,
        bool $clearAborted = true
    ) {
        self::waitController();
        return Kernel::gatherWait($requests, $count, $exception, $clearAborted);
    }

    /**
     * Setup wait/gather to run and wait until requested count is reached.
     *
     * @return void
     */
    protected static function waitController()
    {
        /**
         * Check and handle request tasks already completed before entering/executing, fetch()/wait().
         */
        $onAlreadyCompleted = function (TaskInterface $tasks) {
            $tasks->customState('ended');
            $hyper = $tasks->getCustomData();
            if ($hyper instanceof HyperInterface)
                $hyper->flush();

            return $tasks->result();
        };

        /**
         * Handle not started tasks, force start.
         */
        $onRequestNotStarted = function (TaskInterface $tasks, CoroutineInterface $coroutine) {
            try {
                if (($tasks->getState() === 'running') || $tasks->isRescheduled()) {
                    $coroutine->execute(true);
                } elseif ($tasks->isCustomState('beginning') && !$tasks->isCompleted()) {
                    $coroutine->schedule($tasks);
                    $coroutine->execute(true);
                }

                if ($tasks->isCompleted() || $tasks->isErred()) {
                    $tasks->customState();
                }
            } catch (\Throwable $error) {
                $tasks->setState('erred');
                $tasks->setException($error);
                $coroutine->schedule($tasks);
                $coroutine->execute(true);
            }
        };

        /**
         * Handle finished tasks
         */
        $onCompletedRequests = function (TaskInterface $tasks) {
            $tasks->customState('ended');
            $hyper = $tasks->getCustomData();
            if ($hyper instanceof HyperInterface)
                $hyper->flush();

            return $tasks->result();
        };

        /**
         * When updating current/running task list, abort/close responses/requests that will not be used.
         */
        $onRequestsToClear = function (TaskInterface $tasks) {
            $tasks->customState('aborted');
            $hyper = $tasks->getCustomData();
            if ($hyper instanceof HyperInterface)
                $hyper->close();
        };

        /**
         * Handle error tasks.
         */
        $onError = null;

        /**
         * Handle cancel tasks.
         */
        $onCancel = null;

        Kernel::gatherController(
            'beginning',
            $onAlreadyCompleted,
            $onRequestNotStarted,
            $onCompletedRequests,
            $onError,
            $onCancel,
            $onRequestsToClear
        );
    }

    /**
     * @inheritdoc
     */
    public static function wait(...$httpId)
    {
        self::waitController();
        return Kernel::gather(...$httpId);
    }

    /**
     * @inheritdoc
     */
    public static function awaitable(\Generator $httpFunction, HyperInterface $hyper)
    {
        return Kernel::away($httpFunction, 'beginning', $hyper);
    }

    /**
     * @inheritdoc
     */
    public static function cancel(int $httpId)
    {
        return Kernel::cancelTask($httpId, 'aborted', \BAD_ID);
    }

    public function selectSendRequest(
        RequestInterface $request,
        int $attempts = \RETRY_ATTEMPTS,
        float $timeout = \RETRY_TIMEOUT,
        bool $withTimeout = false
    ) {
        if ($attempts > 0) {
            $this->timeout = ($withTimeout) ? $timeout : \REQUEST_TIMEOUT;
            try {
                if ($request instanceof Request) {
                    $request = $request->withOptions(['timeout' => $this->timeout]);
                } else {
                    $request = $request->withHeader('timeout', $this->timeout);
                }

                $response = yield $this->sendRequest($request);
            } catch (ClientException $requestError) {
                $error = $requestError->getMessage();
                if (\strpos($error, 'respond') || (\strpos($error, 'failed to open stream') && $attempts === \RETRY_ATTEMPTS)) {
                    $attempts--;
                    $timeout = $timeout * \RETRY_MULTIPLY;
                    yield \log_debug(
                        'On task: {taskId} {class}, Retry: {attempts} Timeout: {timeout} Exception: {exception}',
                        ['taskId' => $this->httpId, 'class' => __METHOD__, 'attempts' => $attempts, 'timeout' =>  $timeout, 'exception' => $requestError],
                        $this->loggerName
                    );

                    $response = yield $this->selectSendRequest($request, $attempts, $timeout, true);
                } else {
                    yield \log_error(
                        'On task: {taskId} {class}, Timeout: {timeout} Exception: {exception}',
                        ['taskId' => $this->httpId, 'class' => __METHOD__, 'timeout' =>  $timeout, 'exception' => $requestError],
                        $this->loggerName
                    );

                    // Throw, if this method wasn't called by `request/awaitable`, `fetch/wait`,
                    // or any `http_*` functions. The task id value should be anything beside 1, 2 or null.
                    if (($this->httpId === 1) || ($this->httpId === 2) || ($this->httpId === null)) {
                        throw $requestError;
                    }


                    $response = $requestError;
                }
            }

            return $response;
        }

        return;
    }

    /**
     * @inheritdoc
     */
    public function get(string $url, array ...$authorizeHeaderOptions)
    {
        return yield $this->selectSendRequest(
            $this->request(Request::METHOD_GET, $url, null, $authorizeHeaderOptions)
        );
    }

    /**
     * @inheritdoc
     */
    public function post(string $url, $data = null, array ...$authorizeHeaderOptions)
    {
        return yield $this->selectSendRequest(
            $this->request(Request::METHOD_POST, $url, $data, $authorizeHeaderOptions)
        );
    }

    /**
     * @inheritdoc
     */
    public function head(string $url, array ...$authorizeHeaderOptions)
    {
        $response = yield $this->selectSendRequest(
            $this->request(Request::METHOD_HEAD, $url, null, $authorizeHeaderOptions)
        );

        if (($response instanceof \Throwable) || $response->getStatusCode() === 405) {
            $response = yield $this->selectSendRequest(
                $this->request(Request::METHOD_GET, $url, null, $authorizeHeaderOptions)
            );
        }

        return $response;
    }

    /**
     * @inheritdoc
     */
    public function patch(string $url, $data = null, array ...$authorizeHeaderOptions)
    {
        return yield $this->selectSendRequest(
            $this->request(Request::METHOD_PATCH, $url, $data, $authorizeHeaderOptions)
        );
    }

    /**
     * @inheritdoc
     */
    public function put(string $url, $data = null, array ...$authorizeHeaderOptions)
    {
        return yield $this->selectSendRequest(
            $this->request(Request::METHOD_PUT, $url, $data, $authorizeHeaderOptions)
        );
    }

    /**
     * @inheritdoc
     */
    public function delete(string $url, $data = null, array ...$authorizeHeaderOptions)
    {
        return yield $this->selectSendRequest(
            $this->request(Request::METHOD_DELETE, $url, $data, $authorizeHeaderOptions)
        );
    }

    /**
     * @inheritdoc
     */
    public function options(string $url, array ...$authorizeHeaderOptions)
    {
        return yield $this->selectSendRequest(
            $this->request(Request::METHOD_OPTIONS, $url, null, $authorizeHeaderOptions),
            3,
            5,
            true
        );
    }

    /**
     * @inheritdoc
     */
    public function request($method, $url, $body = null, array ...$authorizeHeaderOptions): RequestInterface
    {
        $headerOptions = $this->optionsHeaderSplicer($authorizeHeaderOptions);
        $defaultOptions = self::OPTIONS;
        $defaultHeaders = self::HEADERS;

        $headers = $options = [];
        $index = 0;
        \array_map(function ($sections) use (&$headers, &$options, &$index) {
            $index++;
            if ($index == 1) {
                $headers = (isset($sections['headers'][0])) ? [] : $sections;
            } else {
                $options = \array_merge($options, $sections);
            }
        }, $headerOptions);

        // Build out URI instance
        if (!$url instanceof UriInterface) {
            $url = Uri::create($url);
        }

        // Create a new Request
        $request = (new Request)
            ->withMethod($method)
            ->withUri($url);

        // Set default HTTP version
        $request = $request->withProtocolVersion((string) $defaultOptions['protocol_version']);

        // Add in default headers to request.
        if (!empty($defaultHeaders['headers'])) {
            foreach ($defaultHeaders['headers'] as $name => $value) {
                $request = $request->withAddedHeader($name, $value);
            }
        }

        // Add in default User-Agent header if none was provided.
        if ($request->hasHeader('User-Agent') === false) {
            $request = $request->withHeader('User-Agent', \SYMPLELY_USER_AGENT);
        }

        // Add requested specific options..
        if (!empty($options) && $request instanceof Request) {
            // Add with defaults also.
            $useOptions = \array_merge($defaultOptions, $options);
            $request = $request->withOptions($useOptions);
        }

        // Add request specific headers.
        if (!empty($headers['headers'])) {
            foreach ($headers['headers'] as $key => $value) {
                $request = $request->withHeader($key, $value);
            }
        }

        if (\is_array($body)) {
            $index = 0;
            $type = '';
            $data = [];
            $format = null;
            foreach ($body as $key => $value) {
                $index++;
                if ($index == 1) {
                    $type = ($key === 0) && \is_string($value) ? $value : Body::FORM;
                    $data = \is_string($key) ? [$key => $value] : $value;
                } elseif ($index == 2) {
                    $data = ($key === 0) || \is_array($value) ? $value : [$key => $value];
                } elseif ($index == 3) {
                    $format = $value;
                }
            };

            $body = new Body($type, $data, $format);
        }

        // Add body and Content-Type header
        if ($body) {
            if ($body instanceof BodyInterface && $request->hasHeader('Content-Type') === false) {
                $request = $request->withHeader("Content-Type", $body->getContentType());
            }

            if ($request->hasHeader('Content-Length') === false) {
                $request = $request->withHeader("Content-Length", (string) $body->getSize());
            }

            $request = $request->withBody($body);
        }

        if ($this->hasZlib) {
            $request = $request->withAddedHeader('Accept-Encoding', 'gzip, deflate, identity');
        }

        $this->request = $request;

        return $request;
    }

    /**
     * @inheritdoc
     */
    public function sendRequest(RequestInterface $request) // Can't use `ResponseInterface` return type, cause method contains `yield`
    {
        $option = self::OPTIONS;
        $method = $request->getMethod();

        if ($request->getBody()->getSize()) {
            $request = $request->withHeader('Content-Length', (string) $request->getBody()->getSize());
        }

        $useOption = $request instanceof Request ? $request->getOptions() : [];
        $useOptions = empty($useOption) ? $option : $useOption;
        $options = \array_merge($useOptions, [
            'method' => $method,
            'protocol_version' => $request->getProtocolVersion(),
            'header' => $this->buildRequestHeaders($request->getHeaders()),
        ]);

        $context = [
            'http' => $options,
            'ssl' => [
                'disable_compression' => true
            ]
        ];

        if ($request->getBody()->getSize()) {
            if ($request->getBody() instanceof BodyInterface) {
                $context['http']['content'] = $request->getBody()->__toString();
            } else {
                $context['http']['content'] = yield $request->getBody()->getContents();
            }
        }

        $ctx = \stream_context_create($context);
        if ($request instanceof Request && $request->debugging()) {
            \stream_context_set_params($ctx, array('notification' => [$request, 'debug']));
        }

        $url = $request->getUri()->__toString();
        yield;

        $start = \microtime(true);
        $resource = @\fopen($url, 'rb', false, $ctx);
        $timer = \microtime(true) - $start;

        if (empty($this->httpId)) {
            $this->httpId = yield Kernel::getTask();
        }

        if (!\is_resource($resource)) {
            $error = \error_get_last()['message'];
            if (\strpos($error, 'getaddrinfo') || \strpos($error, 'Connection refused')) {
                $e = new NetworkException($error, $request);
            } else {
                $e = new RequestException($request, $error, 0);
            }

            yield \log_error(
                'On task: {taskId} {class}, failed In: {timer}ms on Timeout: {timeout} with Exception: {exception}',
                ['taskId' => $this->httpId, 'class' => __METHOD__, 'timer' => $timer, 'timeout' => $this->timeout, 'exception' => $e],
                $this->loggerName
            );

            throw $e;
        }

        yield \log_info(
            'On task: {taskId} {class}, {method} {url} Timeout: {timeout} Took: {timer}ms',
            ['taskId' => $this->httpId, 'class' => __METHOD__, 'method' => $method, 'url' => $url, 'timeout' => $this->timeout, 'timer' => $timer],
            $this->loggerName
        );

        $stream = AsyncStream::createFromResource($resource);
        if (!\stream_set_timeout($resource, (int) ($this->timeout * \RETRY_MULTIPLY))) {
            $stream->close();
            $e = new RequestException($request, \error_get_last()['message'], 0);
            yield \log_warning(
                'On task: {taskId} {class}, {method} {url} failed to Set: {timeout} Exception: {exception}',
                ['taskId' => $this->httpId, 'class' => __METHOD__, 'method' => $method, 'url' => $url, 'timeout' => ($this->timeout * \RETRY_MULTIPLY), 'exception' => $e],
                $this->loggerName
            );

            throw $e;
        }

        $headers = \stream_get_meta_data($resource)['wrapper_data'];

        // Add task id to stream instance
        $this->stream = $stream->withTask($this->httpId);

        if ($option['follow_location']) {
            $headers = $this->filterResponseHeaders($headers);
        }

        $parts = \explode(' ', \array_shift($headers), 3);
        $version = \explode('/', $parts[0])[1];
        $status = (int) $parts[1];

        yield;
        if (($method == Request::METHOD_HEAD) || ($method == Request::METHOD_OPTIONS)) {
            $response = Response::create($status)
                ->withProtocolVersion($version);
        } else {
            $response = Response::create($status)
                ->withProtocolVersion($version)
                ->withBody($stream);
        }

        foreach ($this->buildResponseHeaders($headers) as $key => $value) {
            $response = $response->withHeader($key, $value);
        }

        while (
            $this->hasZlib && '' !== ($encoding = \strtolower($response->getHeaderLine('Content-Encoding')))
        ) {
            switch ($encoding) {
                case 'gzip':
                    $encoding = \ZLIB_ENCODING_GZIP;
                    break;
                case 'deflate':
                    $encoding = \ZLIB_ENCODING_DEFLATE;
                    break;
                default:
                    break 2;
            }

            yield;
            if ($response->getBody()->isSeekable()) {
                $response->getBody()->rewind();
            }

            $response = $response->withBody($stream->inflate((int) $encoding));
            $response = $response->withoutHeader('Content-Encoding');
            $response = $response->withoutHeader('Content-Length');

            break;
        }

        return $response;
    }

    /**
     * Build the request headers.
     *
     * @param array $requestHeaders
     * @return array<string>
     */
    protected function buildRequestHeaders(array $requestHeaders): array
    {
        $headers = [];

        foreach ($requestHeaders as $key => $values) {
            foreach ($values as $value) {
                $headers[] = "{$key}: {$value}";
            }
        }

        return $headers;
    }

    /**
     * Build the response headers.
     *
     * @param array $lines
     * @return array<string>
     */
    protected function buildResponseHeaders(array $lines): array
    {
        $headers = [];
        foreach ($lines as $line) {
            $parts = \explode(':', $line, 2);
            $headers[\trim($parts[0])][] = \trim($parts[1] ?? null);
        }

        return $headers;
    }

    /**
     * @param array $headers
     *
     * @return array
     */
    protected function filterResponseHeaders(array $headers): array
    {
        $filteredHeaders = [];
        foreach ($headers as $header) {
            if (strpos($header, 'HTTP/') === 0) {
                $filteredHeaders = [];
            }

            $filteredHeaders[] = $header;
        }

        return $filteredHeaders;
    }

    protected function authorization(array $authorize = null): string
    {
        if (empty($authorize))
            return '';

        $authorization = '';
        if (isset($authorize['auth_basic']) && isset($authorize['auth_basic'][0]) && isset($authorize['auth_basic'][1])) {
            // HTTP Basic authentication with a username and a password
            $authorization = 'Basic ' . \base64_encode($authorize['auth_basic'][0] . ':' . $authorize['auth_basic'][1]);
        } elseif (isset($authorize['auth_basic']) && isset($authorize['auth_basic'][0])) {
            // HTTP Basic authentication with only the username and not a password
            $authorization = 'Basic ' . \base64_encode($authorize['auth_basic'][0]);
        } elseif (isset($authorize['auth_bearer'])) {
            // HTTP Bearer authentication (also called token authentication)
            $authorization = 'Bearer ' . $authorize['auth_bearer'];
        } elseif (isset($authorize['auth_digest']) && isset($authorize['auth_digest'][0])) {
            $authorization = 'Digest ';
            foreach ($authorize as $k => $v) {
                if (empty($k) || empty($v))
                    continue;

                if ($k == 'password')
                    continue;

                $authorization .= $k . '="' . $v . '", ';
            }
        }

        return $authorization;
    }

    protected function optionsHeaderSplicer(array ...$headersOptions): array
    {
        $headersOptions = $headersOptions[0];
        $header['headers'] = $authorizer = $headers = $options = [];
        if (isset($headersOptions[0][0])) {
            $temp = \array_shift($headersOptions);
            if (empty($headersOptions))
                $headersOptions = $temp;
        }

        $index = 0;
        if (\is_array($headersOptions)) {
            \array_map(function ($sections) use (&$authorizer, &$headers, &$options, &$index) {
                $index++;
                if ($index == 1) {
                    if (!empty($sections)) {
                        $authorization = $this->authorization($sections);
                        $authorizer = !empty($authorization) ? ['Authorization' => $authorization] : [];
                    }
                } elseif ($index == 2) {
                    $headers = $sections;
                } else {
                    $options = \array_merge($options, $sections);
                }
            }, $headersOptions);
        }

        if (!empty($authorizer))
            $combined = \array_merge($authorizer, $headers);
        else
            $combined = $headers;

        if (!empty($combined))
            $header['headers'] = $combined;

        return !empty($header['headers']) ? [$header, $options] : [[], $options];
    }
}