jndrm/mqtt-packet

View on GitHub
src/ControlPacket.php

Summary

Maintainability
A
2 hrs
Test Coverage
<?php

namespace Drmer\Mqtt\Packet;

use Drmer\Mqtt\Packet\Protocol\Version;
use Drmer\Mqtt\Packet\Protocol\Version4;
use Drmer\Mqtt\Packet\Utils\MessageHelper;

abstract class ControlPacket
{

    // packet identifer index
    // set this to -1 if subclass need to
    // proccess identifier itself.
    const ID_INDEX = 2;

    /** @var $version Version */
    protected $version;

    protected $payload = '';

    protected $identifier;

    public function __construct()
    {
        $this->version = new Version4();
    }

    public function setVersion(Version $version)
    {
        $this->version = $version;
    }

    public function setIdentifier($identifier)
    {
        $this->identifier = intval($identifier);
        return $this;
    }

    public function getIdentifier()
    {
        return $this->identifier;
    }

    public function parse($rawInput)
    {
        // before parsing we need to clean payload
        $this->payload = '';
        if (static::ID_INDEX > 0) {
            $this->identifier = $this->parseIdentifier($rawInput, static::ID_INDEX);
        }
    }

    public function parseIdentifier($rawInput, $startIndex)
    {
        if (strlen($rawInput) < $startIndex + 2) {
            return null;
        }
        $identifier = unpack('n', substr($rawInput, $startIndex, 2));
        return array_pop($identifier);
    }

    /** @return int */
    public static function getControlPacketType()
    {
        throw new \RuntimeException('you must overwrite getControlPacketType()');
    }

    public function getPayload()
    {
        return $this->payload;
    }

    protected function getRemainingLength()
    {
        return strlen($this->getVariableHeader()) + strlen($this->getPayload());
    }

    /**
     * @return string
     */
    protected function getFixedHeader()
    {
        // Figure 3.8
        $byte1 = static::getControlPacketType() << 4;
        $byte1 = $this->addReservedBitsToFixedHeaderControlPacketType($byte1);

        $byte2 = $this->getRemainingLength();

        return chr($byte1)
             . static::encodeLength($byte2);
    }

    /**
     *
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc304802782
     *
     */
    protected static function encodeLength($len)
    {
        $res = '';
        do {
            $digit = $len % 128;
            $len = $len >> 7;
            if ($len > 0) {
                $digit = ($digit | 0x80);
            }
            $res .= chr($digit);
        } while ($len > 0);
        return $res;
    }

    /**
     *
     * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc304802782
     *
     */
    protected static function decodeLength(&$startIndex, $rawInput)
    {
        $multi = 1;
        $value = 0;
        do {
            $digit = ord($rawInput{$startIndex});
            $value += ($digit & 127) * $multi;
            $multi *= 128;
            $startIndex += 1;
        } while (($digit & 128) != 0);
        return $value;
    }

    /**
     * @return string
     */
    protected function getVariableHeader()
    {
        if (is_null($this->identifier)) {
            return '';
        }
        return pack('n', $this->identifier);
    }

    /**
     * @param $stringToAdd
     */
    public function addRawToPayLoad($stringToAdd)
    {
        $this->payload .= $stringToAdd;
    }

    /**
     * @param $fieldPayload
     */
    public function addLengthPrefixedField($fieldPayload)
    {
        $return = $this->getLengthPrefixField($fieldPayload);
        $this->addRawToPayLoad($return);
    }

    public function getLengthPrefixField($fieldPayload)
    {
        $stringLength = strlen($fieldPayload);
        $msb = $stringLength >> 8;
        $lsb = $stringLength % 256;
        $return = chr($msb);
        $return .= chr($lsb);
        $return .= $fieldPayload;

        return $return;
    }

    public function get()
    {
        return $this->getFixedHeader() .
               $this->getVariableHeader() .
               $this->getPayload();
    }

    /**
     * @param $byte1
     * @return $byte1 unmodified
     */
    protected function addReservedBitsToFixedHeaderControlPacketType($byte1)
    {
        return $byte1;
    }

    /**
     * @param int $startIndex
     * @param string $rawInput
     * @return string
     */
    protected static function getPayloadLengthPrefixFieldInRawInput($startIndex, $rawInput)
    {
        $headerLength = 2;
        $header = substr($rawInput, $startIndex, $headerLength);
        $lengthOfMessage = ord($header{1});

        return substr($rawInput, $startIndex + $headerLength, $lengthOfMessage);
    }

    /**
     * Read string from buffer.
     * @param $buffer
     * @return string
     * @see https://github.com/walkor/mqtt/blob/master/src/Protocols/Mqtt.php#readString
     */
    public static function readString(&$buffer)
    {
        $tmp = unpack('n', $buffer);
        $length = array_pop($tmp);
        if ($length + 2 > strlen($buffer)) {
            throw new \RuntimeException("buffer:".bin2hex($buffer)." lenth:$length not enough for unpackString");
        }

        $string = substr($buffer, 2, $length);
        $buffer = substr($buffer, $length + 2);
        return $string;
    }

    public function debugPrint()
    {
        echo "\n";
        echo MessageHelper::getReadableByRawString($this->get());
    }

    /**
     * Read unsigned short int from buffer.
     * @param $buffer
     * @return mixed
     * @see https://github.com/walkor/mqtt/blob/master/src/Protocols/Mqtt.php#readShortInt
     */
    public static function readShortInt(&$buffer)
    {
        $tmp = unpack('n', $buffer);
        $buffer = substr($buffer, 2);
        return array_pop($tmp);
    }
}