artur-graniszewski/ZEUS-for-PHP

View on GitHub
src/Zeus/Kernel/IpcServer/Adapter/MsgAdapter.php

Summary

Maintainability
A
1 hr
Test Coverage
<?php

namespace Zeus\Kernel\IpcServer\Adapter;

use Zeus\Kernel\IpcServer\Adapter\Helper\MessagePackager;
use Zeus\Kernel\IpcServer\MessageQueueCapacityInterface;
use Zeus\Kernel\IpcServer\MessageSizeLimitInterface;
use Zeus\Kernel\IpcServer\NamedLocalConnectionInterface;

/**
 * Handles Inter Process Communication using SystemV functionality.
 *
 * @internal
 */
final class MsgAdapter implements
    IpcAdapterInterface,
    NamedLocalConnectionInterface,
    MessageSizeLimitInterface,
    MessageQueueCapacityInterface
{
    use MessagePackager;

    /**
     * Queue links.
     *
     * @var resource[]
     */
    protected $ipc;

    /** @var string */
    protected $namespace;

    /** @var mixed[] */
    protected $config;

    /** @var int */
    protected $channelNumber = 0;

    /** @var bool[] */
    protected $activeChannels = [0 => true, 1 => true];

    /** @var int[] */
    protected $queueInfo;

    /** @var bool */
    protected $connected;

    /**
     * Creates IPC object.
     *
     * @param string $namespace
     * @param mixed[] $config
     */
    public function __construct($namespace, array $config)
    {
        $this->namespace = $namespace;
        $this->config = $config;
    }

    /**
     * @return bool
     */
    public function isConnected()
    {
        return $this->connected;
    }

    /**
     * @return $this
     */
    public function connect()
    {
        if ($this->connected) {
            throw new \LogicException("Connection already established");
        }

        if (!$this->isSupported()) {
            throw new \RuntimeException("Adapter not supported by the PHP configuration");
        }

        $id1 = $this->getQueueId();
        $this->ipc[0] = msg_get_queue($id1, 0600);
        msg_set_queue($this->ipc[0], ['msg_qbytes' => $this->getMessageSizeLimit()]);
        $id2 = $this->getQueueId();
        $this->ipc[1] = msg_get_queue($id2, 0600);
        msg_set_queue($this->ipc[0], ['msg_qbytes' => $this->getMessageSizeLimit()]);

        if (!$id1 || !$id2) {
            // something went wrong
            throw new \RuntimeException("Failed to find a queue for IPC");
        }

        $this->connected = true;

        return $this;
    }

    /**
     * @param int $channelNumber
     */
    protected function checkChannelAvailability($channelNumber)
    {
        if (!$this->connected) {
            throw new \LogicException("Connection is not established");
        }

        if (!isset($this->activeChannels[$channelNumber]) || $this->activeChannels[$channelNumber] !== true) {
            throw new \LogicException(sprintf('Channel number %d is unavailable', $channelNumber));
        }
    }

    /**
     * @return int
     */
    protected function getQueueId()
    {
        $queueId = 0;
        $info = $this->getQueueInfo();
        $maxQueueId = $info['queues_max'];

        while ($queueId < $maxQueueId) {
            if (!msg_queue_exists($queueId)) {
                return $queueId;
            }

            $queueId++;
        }

        throw new \RuntimeException('No available queue was found');
    }

    /**
     * Sends a message to the queue.
     *
     * @param string $message
     * @return $this
     */
    public function send($message)
    {
        $channelNumber = $this->channelNumber;

        $channelNumber == 0 ?
            $channelNumber = 1
            :
            $channelNumber = 0;

        $this->checkChannelAvailability($channelNumber);
        $message = $this->packMessage($message);

        if (strlen($message) + 1 > $this->getMessageSizeLimit()) {
            throw new \RuntimeException(sprintf("Message length exceeds max packet size of %d bytes",  $this->getMessageSizeLimit()));
        }

        function_exists('error_clear_last') ? error_clear_last() : @trigger_error("", E_USER_NOTICE);
        if (!@msg_send($this->ipc[$channelNumber], 1, $message, true, true, $errorNumber)) {
            $error = error_get_last();
            throw new \RuntimeException(sprintf('Error %d occurred when sending message to channel %d: %s', $errorNumber, $channelNumber, $error['message']));
        }

        return $this;
    }

    /**
     * Receives a message from the queue.
     *
     * @param bool $success
     * @return mixed Received message.
     */
    public function receive(& $success = false)
    {
        $success = false;
        $channelNumber = $this->channelNumber;
        $this->checkChannelAvailability($channelNumber);

        $messageType = 1;
        $success = msg_receive($this->ipc[$channelNumber], $messageType, $messageType,  $this->getMessageSizeLimit(), $message, true, MSG_IPC_NOWAIT);

        return $this->unpackMessage($message);
    }

    /**
     * Receives all messages from the queue.
     *
     * @return mixed[] Received messages.
     */
    public function receiveAll()
    {
        $channelNumber = $this->channelNumber;
        $this->checkChannelAvailability($channelNumber);

        $messages = [];

        // early elimination
        $stats = msg_stat_queue($this->ipc[$channelNumber]);
        if (!$stats['msg_qnum']) {

            // nothing to read
            return $messages;
        }

        for(;;) {
            $message = $this->receive($success);

            if (!$success) {
                break;
            }

            $messages[] = $message;
        }

        return $messages;
    }

    /**
     * Destroys this IPC object.
     *
     * @param int $channelNumber
     * @return $this
     */
    public function disconnect($channelNumber = -1)
    {
        if ($channelNumber !== -1) {
            $this->checkChannelAvailability($channelNumber);

            msg_remove_queue($this->ipc[$channelNumber]);
            unset($this->ipc[$channelNumber]);
            $this->activeChannels[$channelNumber] = false;

            return $this;
        }

        foreach (array_keys($this->ipc) as $channelNumber) {
            msg_remove_queue($this->ipc[$channelNumber]);
            unset($this->ipc[$channelNumber]);
        }

        $this->activeChannels = [0 => false, 1 => false];

        return $this;
    }

    /**
     * @return bool
     */
    public function isSupported()
    {
        return function_exists('msg_stat_queue');
    }

    /**
     * @param int $channelNumber
     * @return $this
     */
    public function useChannelNumber($channelNumber)
    {
        $this->checkChannelAvailability($channelNumber);

        $this->channelNumber = $channelNumber;

        return $this;
    }

    /**
     * @return int
     */
    public function getMessageSizeLimit()
    {
        $info = $this->getQueueInfo();

        return $info['msg_qbytes'];
    }

    /**
     * @return int
     */
    public function getMessageQueueCapacity()
    {
        $info = $this->getQueueInfo();

        return $info['msg_default'];
    }

    /**
     * @return int[]
     */
    protected function getQueueInfo()
    {
        if (!$this->queueInfo) {
            $queueId = null;
            $queue = $this->ipc[0] ? $this->ipc[0] : ($this->ipc[1] ? $this->ipc[1] : null);

            // detect queue limits...
            $this->queueInfo['msg_default'] = 10;
            $this->queueInfo['queues_max'] = 256;
            foreach (['msg_default', 'queues_max'] as $setting) {
                $fileName = '/proc/sys/fs/mqueue/' . $setting;
                if (is_readable($fileName)) {
                    $this->queueInfo[$setting] = (int)file_get_contents($fileName);
                }
            }

            if (!$queue) {
                $queueId = $this->getQueueId();
                $queue = msg_get_queue($queueId, 0600);
            }

            $this->queueInfo = array_merge($this->queueInfo, msg_stat_queue($queue));
            if ($queueId) {
                msg_remove_queue($queue);
            }
        }

        return $this->queueInfo;
    }
}