artur-graniszewski/ZEUS-for-PHP

View on GitHub
src/Zeus/Kernel/IpcServer/Adapter/FifoAdapter.php

Summary

Maintainability
A
1 hr
Test Coverage
<?php

namespace Zeus\Kernel\IpcServer\Adapter;

use Zeus\Kernel\IpcServer\Adapter\Helper\MessagePackager;
use Zeus\Kernel\IpcServer\MessageSizeLimitInterface;
use Zeus\Kernel\IpcServer\NamedLocalConnectionInterface;
use Zeus\Kernel\Networking\FlushableConnectionInterface;
use Zeus\Kernel\Networking\FileStream;

/**
 * Class FifoAdapter
 * @package Zeus\Kernel\IpcServer\Adapter
 * @internal
 */
final class FifoAdapter implements
    IpcAdapterInterface,
    NamedLocalConnectionInterface,
    MessageSizeLimitInterface
{
    use MessagePackager;

    /** @var FileStream[] sockets */
    protected $ipc = [];

    /** @var string */
    protected $namespace;

    /** @var int */
    protected $channelNumber = 0;

    /** @var mixed[] */
    protected $config;

    /** @var bool[] */
    protected $activeChannels = [0 => true, 1 => true];

    /** @var bool */
    protected $connected;

    protected static $maxPipeCapacity = null;

    /**
     * Creates IPC object.
     *
     * @param string $namespace
     * @param mixed[] $config
     */
    public function __construct($namespace, array $config)
    {
        $this->namespace = $namespace;
        $this->config = $config;
    }

    /**
     * @return bool
     */
    public function isConnected()
    {
        return $this->connected;
    }

    /**
     * @return $this
     */
    public function connect()
    {
        if ($this->connected) {
            throw new \LogicException("Connection already established");
        }

        if (!$this->isSupported()) {
            throw new \RuntimeException("Adapter not supported by the PHP configuration");
        }

        $fileName1 = $this->getFilename(0);
        $fileName2 = $this->getFilename(1);
        posix_mkfifo($fileName1, 0600);
        posix_mkfifo($fileName2, 0600);

        $ipc[0] = fopen($fileName1, "r+"); // ensures at least one writer (us) so will be non-blocking
        $ipc[1] = fopen($fileName2, "r+"); // ensures at least one writer (us) so will be non-blocking
        stream_set_blocking($ipc[0], false); // prevent fread / fwrite blocking
        stream_set_blocking($ipc[1], false); // prevent fread / fwrite blocking
        $this->ipc[0] = new FileStream($ipc[0]);
        $this->ipc[1] = new FileStream($ipc[1]);
        $this->getMessageSizeLimit();

        $this->connected = true;

        return $this;
    }

    /**
     * @param int $channelNumber
     */
    protected function checkChannelAvailability($channelNumber)
    {
        if (!$this->connected) {
            throw new \LogicException("Connection is not established");
        }

        if (!isset($this->activeChannels[$channelNumber]) || $this->activeChannels[$channelNumber] !== true) {
            throw new \LogicException(sprintf('Channel number %d is unavailable', $channelNumber));
        }
    }

    protected function getFilename($channelNumber)
    {
        return sprintf("%s/%s.%d", getcwd(), $this->namespace, $channelNumber);
    }

    /**
     * @param int $channelNumber
     * @return $this
     */
    public function useChannelNumber($channelNumber)
    {
        $this->checkChannelAvailability($channelNumber);

        $this->channelNumber = $channelNumber;

        return $this;
    }

    /**
     * Sends a message to the queue.
     *
     * @param string $message
     * @return $this
     */
    public function send($message)
    {
        $channelNumber = $this->channelNumber;

        $channelNumber = $channelNumber == 0 ? 1 : 0;

        $this->checkChannelAvailability($channelNumber);

        $message = $this->packMessage($message);
        if (strlen($message) + 1 > $this->getMessageSizeLimit()) {
            throw new \RuntimeException(sprintf("Message length exceeds max packet size of %d bytes",  $this->getMessageSizeLimit()));
        }

        $this->ipc[$channelNumber]->write($message . "\0");
        if ($this->ipc[$channelNumber] instanceof FlushableConnectionInterface) {
            $this->ipc[$channelNumber]->flush();
        }

        return $this;
    }

    /**
     * Receives a message from the queue.
     *
     * @param bool $success
     * @return mixed Received message.
     */
    public function receive(& $success = false)
    {
        $success = false;
        $channelNumber = $this->channelNumber;

        $this->checkChannelAvailability($channelNumber);

        if (!$this->ipc[$channelNumber]->select(0)) {

            return null;
        }

        $message = $this->ipc[$channelNumber]->read("\0");

        if (is_string($message) && $message !== false) {
            $message = $this->unpackMessage($message);
            $success = true;
            return $message;
        }
    }

    /**
     * Receives all messages from the queue.
     *
     * @return mixed[] Received messages.
     */
    public function receiveAll()
    {
        $channelNumber = $this->channelNumber;

        $this->checkChannelAvailability($channelNumber);

        if (!isset($this->ipc[$channelNumber])) {
            throw new \RuntimeException('Channel number ' . $channelNumber . ' is already closed');
        }

        if (!$this->ipc[$channelNumber]->select(1)) {

            return [];
        }

        $messages = [];

        for (;;) {
            $message = $this->receive($success);
            if (!$success) {

                break;
            }

            $messages[] = $message;
        }

        return $messages;
    }

    /**
     * @param int $channelNumber
     * @return $this
     */
    public function disconnect($channelNumber = -1)
    {
        if ($channelNumber !== -1) {
            $this->checkChannelAvailability($channelNumber);

            $this->ipc[$channelNumber]->close();
            $this->ipc[$channelNumber] = null;
            $this->activeChannels[$channelNumber] = false;

            return $this;
        }

        foreach ($this->ipc as $channelNumber => $stream) {
            if (!$stream) {
                continue;
            }
            $stream->close();
            unlink($this->getFilename($channelNumber));
            $this->ipc[$channelNumber] = null;
        }

        $this->activeChannels = [0 => false, 1 => false];

        return $this;
    }

    /**
     * @return bool
     */
    public function isSupported()
    {
        return function_exists('posix_mkfifo');
    }

    /**
     * @return int
     */
    public function getMessageSizeLimit()
    {
        if (!static::$maxPipeCapacity) {
            $fileName = $this->getFilename(2);
            posix_mkfifo($fileName, 0600);

            $ipc = fopen($fileName, "r+"); // ensures at least one writer (us) so will be non-blocking
            stream_set_blocking($ipc, false);

            $wrote = 1;
            $size = 1;
            $message = str_repeat('a', 524288);
            while ($size < 524288 && $wrote > 0) {
                if (fwrite($ipc, $message, $size) !== $size) {
                    $size = $size >> 1;
                    break;
                }

                fgets($ipc);
                $size = $size << 1;
            }

            fclose($ipc);
            unlink($fileName);

            static::$maxPipeCapacity = $size;
        }

        return static::$maxPipeCapacity;
    }
}