symplely/processor

View on GitHub
Processor/Channel.php

Summary

Maintainability
A
2 hrs
Test Coverage
<?php

declare(strict_types=1);

namespace Async\Processor;

use Async\Processor\Process;
use Async\Processor\ChannelInterface;

/**
 * A channel is used to transfer messages between a `Process` as a IPC pipe.
 *
 * @author Nicolas Grekas <p@tchwork.com>
 */
class Channel implements ChannelInterface
{
    /**
     * @var callable|null
     */
    private $whenDrained = null;
    private $input = [];
    private $open = true;

    /**
     * IPC handle
     *
     * @var Object
     */
    protected $channel = null;
    protected $ipcInput = \STDIN;
    protected $ipcOutput = \STDOUT;
    protected $ipcError = \STDERR;

    public function setup(Object $handle, $input = \STDIN, $output = \STDOUT, $error = \STDERR): ChannelInterface
    {
        $this->channel = $handle;
        $this->ipcInput = $input;
        $this->ipcOutput = $output;
        $this->ipcError = $error;

        return $this;
    }

    public function then(callable $whenDrained = null): ChannelInterface
    {
        $this->whenDrained = $whenDrained;

        return $this;
    }

    public function close(): ChannelInterface
    {
        $this->open = false;

        return $this;
    }

    public function isClosed(): bool
    {
        return !$this->open;
    }

    public function send($message): ChannelInterface
    {
        if (null === $message) {
            return $this;
        }

        if ($this->isClosed()) {
            throw new \RuntimeException(\sprintf('%s is closed', static::class));
        }

        $this->input[] = self::validateInput(__METHOD__, $message);

        return $this;
    }

    public function receive()
    {
        return $this->channel->getResult();
    }

    /**
     * @codeCoverageIgnore
     */
    public function read(int $length = 0): string
    {
        if ($length === 0)
            return \trim(\fgets($this->ipcInput));

        return \fread($this->ipcInput, $length);
    }

    /**
     * @codeCoverageIgnore
     */
    public function write($message): int
    {
        return \fwrite($this->ipcOutput, (string) $message);
    }

    /**
     * @codeCoverageIgnore
     */
    public function error($message): int
    {
        return \fwrite($this->ipcError, (string) $message);
    }

    /**
     * @codeCoverageIgnore
     */
    public function passthru(): int
    {
        return \stream_copy_to_stream(\STDIN, \STDOUT);
    }

    public function getIterator()
    {
        $this->open = true;

        while ($this->open || $this->input) {
            if (!$this->input) {
                yield '';
                continue;
            }

            $current = \array_shift($this->input);
            if ($current instanceof \Iterator) {
                yield from $current;
            } else {
                yield $current;
            }

            $whenDrained = $this->whenDrained;
            if (!$this->input && $this->open && (null !== $whenDrained)) {
                $this->send($whenDrained($this));
            }
        }
    }

    /**
     * Validates and normalizes a Process input.
     *
     * @param string $caller The name of method call that validates the input
     * @param mixed  $input  The input to validate
     *
     * @return mixed The validated input
     *
     * @throws \InvalidArgumentException In case the input is not valid
     */
    protected static function validateInput(string $caller, $input)
    {
        if (null !== $input) {
            if (\is_resource($input)) {
                return $input;
            }

            if (\is_string($input)) {
                return $input;
            }

            if (\is_scalar($input)) {
                return (string) $input;
            }

            // @codeCoverageIgnoreStart
            if ($input instanceof Process) {
                return $input->getIterator($input::ITER_SKIP_ERR);
            }

            if ($input instanceof \Iterator) {
                return $input;
            }
            if ($input instanceof \Traversable) {
                return new \IteratorIterator($input);
            }

            throw new \InvalidArgumentException(\sprintf('%s only accepts strings, Traversable objects or stream resources.', $caller));
        }

        return $input;
        // @codeCoverageIgnoreEnd
    }
}