artur-graniszewski/ZEUS-for-PHP

View on GitHub
src/Zeus/Kernel/IpcServer/Adapter/SocketAdapter.php

Summary

Maintainability
A
1 hr
Test Coverage
<?php

namespace Zeus\Kernel\IpcServer\Adapter;

use Zeus\Kernel\IpcServer\Adapter\Helper\MessagePackager;
use Zeus\Kernel\IpcServer\AnonymousLocalConnectionInterface;
use Zeus\Kernel\IpcServer\MessageSizeLimitInterface;

/**
 * Handles Inter Process Communication using sockets functionality.
 *
 * @internal
 */
final class SocketAdapter implements
    IpcAdapterInterface,
    AnonymousLocalConnectionInterface,
    MessageSizeLimitInterface
{
    use MessagePackager;

    const MAX_MESSAGE_SIZE = 131072;

    /** @var resource[] sockets */
    protected $ipc = [];

    /** @var string */
    protected $namespace;

    /** @var int */
    protected $channelNumber = 0;

    /** @var mixed[] */
    protected $config;

    /** @var bool[] */
    protected $activeChannels = [0 => true, 1 => true];

    /** @var bool */
    protected $connected;

    /**
     * Creates IPC object.
     *
     * @param string $namespace
     * @param mixed[] $config
     */
    public function __construct($namespace, array $config)
    {
        $this->namespace = $namespace;
        $this->config = $config;
    }

    /**
     * @return bool
     */
    public function isConnected()
    {
        return $this->connected;
    }

    /**
     * @return $this
     */
    public function connect()
    {
        if ($this->connected) {
            throw new \LogicException("Connection already established");
        }

        if (!$this->isSupported()) {
            throw new \RuntimeException("Adapter not supported by the PHP configuration");
        }

        $domain = strtoupper(substr(PHP_OS, 0, 3) == 'WIN' ? AF_INET : AF_UNIX);

        if (!socket_create_pair($domain, SOCK_SEQPACKET, 0, $this->ipc)) {
            $errorCode = socket_last_error();
            throw new \RuntimeException("Could not create IPC socket: " . socket_strerror($errorCode), $errorCode);
        }

        socket_set_nonblock($this->ipc[0]);
        socket_set_nonblock($this->ipc[1]);

        $this->connected = true;

        return $this;
    }

    /**
     * @param int $channelNumber
     * @return $this
     */
    public function useChannelNumber($channelNumber)
    {
        $this->checkChannelAvailability($channelNumber);

        $this->channelNumber = $channelNumber;

        return $this;
    }

    /**
     * Sends a message to the queue.
     *
     * @param string $message
     * @return $this
     */
    public function send($message)
    {
        $this->checkChannelAvailability($this->channelNumber);
        $message = $this->packMessage($message);

        socket_set_block($this->ipc[$this->channelNumber]);
        socket_write($this->ipc[$this->channelNumber], $message . "\0", strlen($message) + 1);
        socket_set_nonblock($this->ipc[$this->channelNumber]);

        return $this;
    }

    /**
     * Receives a message from the queue.
     *
     * @param bool $success
     * @return mixed Received message.
     */
    public function receive(& $success = false)
    {
        $message = '';
        $success = false;
        $this->checkChannelAvailability($this->channelNumber);

        $readSocket = [$this->ipc[$this->channelNumber]];
        $writeSocket = $except = [];

        $value = @socket_select($readSocket, $writeSocket, $except, 0, 10);

        if ($value === false) {
            throw new \RuntimeException(sprintf('Error %d occurred when receiving data from channel number %d', socket_last_error($this->ipc[$this->channelNumber]), $this->channelNumber));
        }

        if ($value === 0) {
            return null;
        }

        if (defined('HHVM_VERSION')) {
            // HHVM...
            $message = stream_get_line($readSocket[0], static::MAX_MESSAGE_SIZE, "\0");
        }

        if (!defined('HHVM_VERSION')) {
            $message = '';
            do {
                $message .= socket_read($readSocket[0], static::MAX_MESSAGE_SIZE);
            } while (substr($message, -1) !== "\0");
        }

        if (is_string($message) && $message !== "") {
            $success = true;
            return $this->unpackMessage(defined('HHVM_VERSION') ? $message : substr($message, 0, -1));
        }
    }

    /**
     * Receives all messages from the queue.
     *
     * @return mixed[] Received messages.
     */
    public function receiveAll()
    {
        $this->checkChannelAvailability($this->channelNumber);

        $readSocket = [$this->ipc[$this->channelNumber]];
        $writeSocket = $except = [];
        $messages = [];

        if (@socket_select($readSocket, $writeSocket, $except, 1)) {
            for (;;) {
                $message = $this->receive($success);
                if (!$success) {

                    break;
                }

                $messages[] = $message;
            }
        }

        return $messages;
    }

    /**
     * @param int $channelNumber
     * @return $this
     */
    public function disconnect($channelNumber = -1)
    {
        if ($channelNumber !== -1) {
            $this->checkChannelAvailability($channelNumber);
            $socket = $this->ipc[$channelNumber];
            socket_shutdown($socket, 2);
            socket_close($socket);
            unset($this->ipc[$channelNumber]);
            $this->activeChannels[$channelNumber] = false;
            return $this;
        }

        foreach ($this->ipc as $channelNumber => $socket) {
            if (!$socket) {
                continue;
            }
            socket_shutdown($socket, 2);
            socket_close($socket);
            $this->ipc[$channelNumber] = null;
        }

        $this->activeChannels = [0 => false, 1 => false];

        return $this;
    }

    /**
     * @param int $channelNumber
     */
    protected function checkChannelAvailability($channelNumber)
    {
        if (!$this->connected) {
            throw new \LogicException("Connection is not established");
        }
        if (!isset($this->activeChannels[$channelNumber]) || $this->activeChannels[$channelNumber] !== true) {
            throw new \LogicException(sprintf('Channel number %d is unavailable', $channelNumber));
        }
    }

    /**
     * @return bool
     */
    public function isSupported()
    {
        return function_exists('socket_create_pair');
    }

    /**
     * @return int
     */
    public function getMessageSizeLimit()
    {
        return static::MAX_MESSAGE_SIZE;
    }
}