symplely/hyper

View on GitHub
Request/AsyncStream.php

Summary

Maintainability
C
1 day
Test Coverage
<?php

declare(strict_types=1);

namespace Async\Request;

use Async\Coroutine\Kernel;
use Async\Coroutine\Coroutine;
use Async\Coroutine\Exceptions\RuntimeException;
use Async\Coroutine\Exceptions\InvalidArgumentException;
use Psr\Http\Message\StreamInterface;

/**
 * Class AsyncStream
 *
 * @package Async\Request\AsyncStream
 */
class AsyncStream implements StreamInterface
{
    /**
     * @var string[]
     */
    private const WRITABLE_MODES = ['r+', 'w', 'w+', 'a', 'a+', 'x', 'x+', 'c', 'c+'];

    /**
     * @var string[]
     */
    private const READABLE_MODES = ['r', 'r+', 'w+', 'a+', 'x+', 'c+'];

    /**
     * Stream of data.
     *
     * @var resource|null
     */
    private $resource;

    /**
     * @var bool
     */
    private $seekable;

    /**
     * @var bool
     */
    private $readable;

    /**
     * @var bool
     */
    private $writable;

    /**
     * @var string|null
     */
    private $uri;

    /**
     * @var int|null
     */
    private $size;

    /**
     * The streams associated `Task` id, if any.
     *
     * @var int
     */
    private $hyperId;

    /**
     * Does stream has support for gzip and `inflate/deflate` content encoding.
     *
     * @var bool
     */
    private $hasZlib;

    /**
     * @var resource
     */
    private $contextInflate;

    /**
     * @var resource
     */
    private $contextDeflate;

    /**
     * @var int
     */
    private $zlibEncoding;

    /**
     * @var resource[]
     */
    private static $nonBlocking = [];

    /**
     * @param resource $stream
     * @param string $zlib gzip stream with inflate or deflate, if available.
     *
     * @throws InvalidArgumentException If a resource or string isn't given.
     */
    public function __construct(
        $stream = null,
        ?string $zlib = null,
        int $encoding = 0,
        int $level = 1
    ) {
        if ($zlib == 'inflate') {
            $this->inflate($encoding);
        } elseif ($zlib == 'deflate') {
            $this->deflate($encoding, $level);
        }

        if ($stream == null) {
            $stream = @\fopen('php://temp', 'rb+');
            \rewind($stream);
        } elseif (!\is_resource($stream) || 'stream' !== \get_resource_type($stream)) {
            throw new InvalidArgumentException(
                'Invalid stream provided; must be a string stream identifier or stream resource'
            );
        }

        self::setNonBlocking($stream);
        $this->resource = $stream;
        $meta = $this->getMetadata();
        $this->uri = $meta['uri'] ?? null;
        $this->seekable = $meta['seekable'];
        $this->writable = $this->isWritable();
        $this->readable = $this->isReadable();
    }

    public function withTask(?int $hyperId)
    {
        $this->hyperId = $hyperId;

        return $this;
    }

    /**
     * Initialize an incremental `inflate` context.
     *
     * @param int $encoding compression algorithm used, see `inflate_init()`
     *
     * @see http://php.net/manual/en/function.inflate-init.php
     */
    public function inflate(int $encoding = 0)
    {
        if (\function_exists('inflate_init') && ($encoding > 0)) {
            $this->hasZlib = true;
            $this->zlibEncoding = $encoding;
            $this->contextInflate = @\inflate_init($this->zlibEncoding);
        }

        return $this;
    }

    /**
     * Check for support for gzip and `inflate` content.
     */
    public function isInflate(): bool
    {
        return $this->hasZlib && \is_resource($this->contextInflate);
    }

    /**
     * Initialize an incremental `deflate` context.
     *
     * @param int $encoding compression algorithm used, see `deflate_init()`
     * @param int $level compression level to use.
     *
     * @see http://php.net/manual/en/function.deflate-init.php
     */
    public function deflate(int $encoding = 0, int $level = 1)
    {
        if (\function_exists('deflate_init')) {
            $this->hasZlib = true;
            $this->zlibEncoding = ($encoding == 0) ? \ZLIB_ENCODING_RAW : $encoding;
            $this->contextDeflate = @\deflate_init($this->zlibEncoding, ['level' => $level]);
        }

        return $this;
    }

    /**
     * Check for support for gzip and `deflate` content.
     */
    public function isDeflate(): bool
    {
        return $this->hasZlib && \is_resource($this->contextDeflate);
    }

    /**
     * {@inheritDoc}
     */
    public function getSize(): ?int
    {
        $handle = $this->getResource();

        if ($handle === null) {
            return null;
        }

        if ($this->size !== null) {
            return $this->size;
        }

        // Clear the stat cache if the stream has a URI
        if ($this->uri) {
            \clearstatcache(true, $this->uri);
        }

        $stats = \fstat($handle);
        if (isset($stats['size'])) {
            $this->size = $stats['size'];
            return $this->size;
        }

        return null;
    }

    /**
     * {@inheritDoc}
     */
    public function tell(): int
    {
        $handle = $this->getResource();

        if ($handle === null) {
            throw new RuntimeException('Stream is not open.');
        }

        $position = \ftell($handle);
        if ($position === false) {
            throw new RuntimeException('Unable to get position of stream.');
        }

        return $position;
    }

    /**
     * {@inheritDoc}
     */
    public function rewind(): void
    {
        $handle = $this->getResource();

        if ($handle === null) {
            throw new RuntimeException('Stream is not open.');
        }

        if (!\rewind($handle)) {
            throw new RuntimeException('Failed to rewind stream.');
        }
    }

    /**
     * {@inheritDoc}
     */
    public function isSeekable(): bool
    {
        $handle = $this->getResource();

        if ($handle === null) {
            return false;
        }

        $seekable = $this->getMetadata('seekable');
        if ($seekable === null) {
            return false;
        }

        return $seekable;
    }

    /**
     * {@inheritDoc}
     */
    public function seek($offset, $whence = \SEEK_SET): void
    {
        $handle = $this->getResource();

        if ($handle === null) {
            throw new RuntimeException('Stream is not open.');
        }

        if (0 > \fseek($handle, $offset, $whence)) {
            throw new RuntimeException(
                \sprintf('Failed to seek to offset %s.', $offset)
            );
        }
    }

    /**
     * {@inheritdoc}
     */
    public function __toString()
    {
        if ($this->getResource() === null) {
            return '';
        }

        try {
            if ($this->seekable) {
                $this->seek(0);
            }

            return $this->getContents();
        } catch (\Throwable $e) {
        }

        return '';
    }

    /**
     * {@inheritdoc}
     */
    public function close()
    {
        $handle = $this->detach();;

        if (\is_resource($handle)) {
            \fclose($handle);
        }
    }

    /**
     * {@inheritdoc}
     */
    public function detach()
    {
        $resource = $this->resource;

        $this->resource = null;
        $this->size = null;
        $this->uri = null;
        $this->readable = false;
        $this->writable = false;
        $this->seekable = false;
        $this->hyperId = null;
        $this->contextInflate = null;
        $this->contextDeflate = null;
        $this->zlibEncoding = null;
        $this->hasZlib = false;
        self::$nonBlocking = [];

        return $resource;
    }

    /**
     * {@inheritdoc}
     */
    public function eof()
    {
        $handle = $this->getResource();

        if (isset($handle)) {
            return \feof($handle);
        }

        return true;
    }

    /**
     * {@inheritdoc}
     */
    public function getContents()
    {
        yield;
        $handle =  $this->getResource();
        if ($this->isReadable() && ($handle !== null)) {
            $buffer = "";
            $start = \microtime(true);
            while (true) {
                $begin = \microtime(true);
                yield Kernel::readWait($handle, true);
                if ($this->isInflate()) {
                    $new = self::inflateRead($this, $handle);
                } else {
                    $new = \fread($handle, \FETCH_CHUNK);
                }

                $end = \microtime(true);
                if (\is_string($new) && \strlen($new) >= 1) {
                    $buffer .= $new;
                }

                $time_used = $end - $begin;
                if (($time_used >= 0.25) || !\is_string($new) || (\is_string($new) && \strlen($new) < 1)) {
                    if ($this->isInflate()) {
                        $data = @\inflate_add($this->contextInflate, '', \ZLIB_FINISH);
                        if ($data !== false)
                            $buffer .= $data;
                        $this->contextInflate = null;
                    }

                    break;
                }
            }

            $timer = \microtime(true) - $start;
            if (false !== $buffer) {
                yield \log_notice(
                    'On task: {httpId} {class}, {url} Received: {transferred} bytes Took: {timer}ms',
                    ['httpId' => $this->hyperId, 'class' => __METHOD__, 'url' => $this->uri, 'transferred' => \strlen($buffer), 'timer' => $timer],
                    \hyper_loggerName()
                );

                yield Coroutine::value($buffer);
            } else {
                yield \log_critical('Unable to get contents from underlying resource', \hyper_loggerName());
                throw new RuntimeException('Unable to get contents from underlying resource');
            }
        } else {
            yield \log_critical('Underlying resource is not readable', \hyper_loggerName());
            throw new RuntimeException('Underlying resource is not readable');
        }
    }

    /**
     * {@inheritdoc}
     */
    public function read($length)
    {
        $handle =  $this->getResource();
        if (!$this->isReadable() || ($handle === null)) {
            yield \log_critical('Stream is not readable', \hyper_loggerName());
            throw new RuntimeException('Stream is not readable');
        }

        if ($length < 0) {
            yield \log_critical('Length parameter cannot be negative', \hyper_loggerName());
            throw new RuntimeException('Length parameter cannot be negative');
        }

        if ($length === 0) {
            yield Coroutine::value('');
        } else {
            $start = \microtime(true);
            yield Kernel::readWait($handle, true);
            if ($this->isInflate()) {
                $contents = self::inflateRead($this, $handle);
            } else {
                $contents = \fread($handle, $length);
            }

            $timer = \microtime(true) - $start;
            if ($this->isInflate()) {
                $data = @\inflate_add($this->contextInflate, '', \ZLIB_FINISH);
                if ($data !== false)
                    $contents .= $data;
                $this->contextInflate = null;
            }

            if (false !== $contents) {
                yield \log_notice(
                    'On task: {httpId} {class}, {url} Read: {read} bytes Took: {timer}ms',
                    ['httpId' => $this->hyperId, 'class' => __METHOD__, 'url' => $this->uri, 'read' => \strlen($contents), 'timer' => $timer],
                    \hyper_loggerName()
                );

                yield Coroutine::value($contents);
            } else {
                yield \log_critical('Unable to read from underlying resource', \hyper_loggerName());
                throw new RuntimeException('Unable to read from underlying resource');
            }
        }
    }

    /**
     * Binary-safe file read
     *
     * @param AsyncStream $stream
     * @param resource $handle
     * @return string
     */
    protected static function inflateRead(AsyncStream $stream, $handle)
    {
        $chunk = false;
        if (false !== ($data = \fread($handle, 8192))) {
            $chunk = @\inflate_add($stream->contextInflate, $data, \ZLIB_SYNC_FLUSH);
            if ($chunk !== '') {
                if (\strlen($data) < 8192) {
                    $chunk .= @\inflate_add($stream->contextInflate, '', \ZLIB_FINISH);
                    $stream->contextInflate = null;
                }

            }
        }

        return $chunk;
    }

    /**
     * {@inheritdoc}
     */
    public function write($string)
    {
        $handle =  $this->getResource();
        if (!$this->isWritable() || ($handle === null)) {
            yield \log_critical('Stream is not writable', \hyper_loggerName());
            throw new RuntimeException('Stream is not writable');
        }

        // We can't know the size after writing anything
        $this->size = null;

        $start = \microtime(true);
        yield Kernel::writeWait($handle, true);
        if ($this->isDeflate()) {
            $written = self::deflateWrite($this, $handle, $string);
        } else {
            $written = \fwrite($handle, $string);
        }

        $timer = \microtime(true) - $start;
        if (false !== $written) {
            yield \log_notice(
                'On task: {httpId} {class}, Response: {url} Written: {written} bytes Took: {timer}ms',
                ['httpId' => $this->hyperId, 'class' => __METHOD__, 'url' => $this->uri, 'written' => $written, 'timer' => $timer],
                \hyper_loggerName()
            );

            yield Coroutine::value($written);
        } else {
            yield \log_critical('Unable to write to underlying resource', \hyper_loggerName());
            throw new RuntimeException('Unable to write to underlying resource');
        }
    }

    /**
     * Binary-safe file write
     *
     * @param AsyncStream $stream
     * @param resource $handle
     * @param mixed $chunk
     * @return int
     */
    protected static function deflateWrite(AsyncStream $stream, $handle, $chunk)
    {
        $written = 0;
        $chunk = \deflate_add($stream->contextDeflate, $chunk, \ZLIB_FULL_FLUSH);
        if ($chunk !== '') {
            $written = \fwrite($handle, $chunk);
            $stream->contextDeflate = null;
        }

        return $written;
    }

    /**
     * {@inheritdoc}
     */
    public function getMetadata($key = null)
    {
        $handle = $this->getResource();

        if ($handle === null) {
            return null;
        }

        $metadata = \stream_get_meta_data($handle);
        if ($key) {
            $metadata = isset($metadata[$key]) ? $metadata[$key] : null;
        }

        return $metadata;
    }

    /**
     * {@inheritdoc}
     */
    public function isReadable()
    {
        $handle = $this->getResource();

        if ($handle === null) {
            return false;
        }

        $mode = $this->getMetadata('mode');
        if ($mode === null) {
            return false;
        }

        $mode = \str_replace(['b', 'e'], '', $mode);
        return \in_array($mode, self::READABLE_MODES, true);
    }

    /**
     * {@inheritdoc}
     */
    public function isWritable($stream = null)
    {
        $handle = $this->getResource();

        if ($handle === null) {
            return false;
        }

        $mode = $this->getMetadata('mode');
        if ($mode === null) {
            return false;
        }

        $mode = \str_replace(['b', 'e'], '', $mode);
        return \in_array($mode, self::WRITABLE_MODES, true);
    }

    public function getResource()
    {
        if (\is_resource($this->resource)) {
            return $this->resource;
        }

        return null;
    }

    /**
     * Create a new stream from a string.
     *
     * The stream SHOULD be created with a temporary resource.
     *
     * @param string $content String content with which to populate the stream.
     * @param string|null $zlib gzip stream with either inflate or deflate, if available.
     *
     * @return AsyncStream
     * @throws InvalidArgumentException for invalid streams or resources.
     */
    public static function create(
        string $content = '',
        ?string $zlib = null,
        int $encoding = 0,
        int $level = 1
    ) {
        $resource = @\fopen('php://temp', 'rb+');
        $stream = new self($resource, $zlib, $encoding, $level);
        yield $stream->write($content);
        $stream->rewind();
        return $stream;
    }

    /**
     * Create a stream from an existing file.
     *
     * The file MUST be opened using the given mode, which may be any mode
     * supported by the `fopen` function.
     *
     * The `$filename` MAY be any string supported by `fopen()`.
     *
     * @param string $filename Filename or stream URI to use as basis of stream.
     * @param string $mode Mode with which to open the underlying filename/stream.
     *
     * @return StreamInterface
     * @throws RuntimeException If the file cannot be opened.
     * @throws InvalidArgumentException If the mode is invalid.
     */
    public static function createFromFile(string $filename, string $mode = 'r'): AsyncStream
    {
        return new self(\fopen($filename, $mode));
    }

    /**
     * Create a compress stream from an existing file.
     *
     * The file MUST be opened using the given mode, which may be any mode
     * supported by the `fopen` function.
     *
     * The `$filename` MAY be any string supported by `fopen()`.
     *
     * @param string $filename Filename or stream URI to use as basis of stream.
     * @param string $mode Mode with which to open the underlying filename/stream.
     *
     * @return StreamInterface
     * @throws RuntimeException If the file cannot be opened.
     * @throws InvalidArgumentException If the mode is invalid.
     */
    public static function createDeflateFromFile(string $filename, string $mode = 'r')
    {
        $stream = \fopen($filename, $mode);
        $instance = new self($stream);
        $contents = yield $instance->getContents();
        $instance->close();
        $instance = null;

        $instance = yield self::create($contents, 'deflate');
        return $instance;
    }

    /**
     * Create a new stream from an existing resource.
     *
     * The stream MUST be readable and may be writable.
     *
     * @param resource $resource PHP resource to use as basis of stream.
     *
     * @return AsyncStream
     */
    public static function createFromResource($resource): AsyncStream
    {
        return new self($resource);
    }

    /**
     * @param StreamInterface|resource $source
     * @param StreamInterface|resource|null $destination
     *
     * @return AsyncStream
     * @throws InvalidArgumentException for not an resource.
     * @throws RuntimeException for unable to write to underlying resource.
     */
    public static function copyResource($source, $destination = null)
    {
        $source = $source instanceof AsyncStream ? $source->getResource() : $source;
        $destination = $destination instanceof AsyncStream ? $destination->getResource() : $destination;
        if (!\is_resource($source)) {
            throw new InvalidArgumentException('Not resource.');
        }

        if (\stream_get_meta_data($source)['seekable']) {
            \rewind($source);
        }

        if (empty($destination)) {
            $destination = \fopen('php://temp', 'rb+');
        }

        self::setNonBlocking($source);
        if (!\is_resource($destination)) {
            throw new InvalidArgumentException('Not resource.');
        }

        self::setNonBlocking($destination);
        while (!\feof($source)) {
            yield Kernel::readWait($source, true);
            $data = \stream_get_contents($source, \FETCH_CHUNK);
            $count = \strlen($data);
            if ($count) {
                yield Kernel::writeWait($destination, true);
                $result = \fwrite($destination, $data);
                if (false === $result) {
                    throw new RuntimeException('Unable to write to underlying resource');
                }
            }
        };

        $stream = new self($destination);
        $stream->rewind();

        return $stream;
    }

    /**
     * Returns a pair of connected domain stream socket resources.
     *
     * @return resource[] Pair of non-blocking socket resources.
     *
     * @throws RuntimeException If creating the sockets fails.
     */
    public static function pair(): array
    {
        $domain = (\DIRECTORY_SEPARATOR == '\\') ? \STREAM_PF_INET : \STREAM_PF_UNIX;

        if (false === ($sockets = \stream_socket_pair($domain, \STREAM_SOCK_STREAM, \STREAM_IPPROTO_IP))) {
            $message = 'Failed to create socket pair.';
            if ($error = \error_get_last()) {
                $message .= \sprintf(' Errno: %d; %s', $error['type'], $error['message']);
            }

            throw new RuntimeException($message);
        }

        return [self::setNonBlocking($sockets[0]), self::setNonBlocking($sockets[1])];
    }

    public static function setNonBlocking($socket)
    {
        self::$nonBlocking[(int) $socket] = true;
        if (!\stream_set_blocking($socket, false)) {
            self::$nonBlocking[(int) $socket] = false;
        }

        \stream_set_read_buffer($socket, 0);
        \stream_set_write_buffer($socket, 0);

        return $socket;
    }
}