jndrm/mqtt-client

View on GitHub
src/BaseClient.php

Summary

Maintainability
A
2 hrs
Test Coverage
<?php

namespace Drmer\Mqtt\Client;

use Drmer\Mqtt\Packet\Protocol\Version;
use Drmer\Mqtt\Packet\Protocol\Version4;
use Drmer\Mqtt\Packet\Connect;
use Drmer\Mqtt\Packet\Publish;
use Drmer\Mqtt\Packet\Disconnect;
use Drmer\Mqtt\Packet\Subscribe;
use Drmer\Mqtt\Packet\Unsubscribe;
use Drmer\Mqtt\Packet\ControlPacket;
use Drmer\Mqtt\Packet\ConnectionOptions;
use Drmer\Mqtt\Packet\Utils\Parser;
use Drmer\Mqtt\Packet\ControlPacketType;
use Drmer\Mqtt\Packet\ConnectionAck;
use Drmer\Mqtt\Packet\PingRequest;
use Drmer\Mqtt\Packet\PublishRelease;
use Drmer\Mqtt\Packet\PublishReceived;
use Drmer\Mqtt\Packet\PublishComplete;

abstract class BaseClient extends EventEmitter
{
    protected $socket;

    protected $version;
    protected $messageCounter = 0;
    protected $connectOptions = null;

    public $debug = false;

    abstract protected function socketOpen($host, $port);
    abstract protected function socketSend($data);
    abstract protected function socketClose();
    abstract protected function timerTick($seconds, $callback);

    public function __construct(Version $version)
    {
        $this->version = $version;
    }

    public static function v4()
    {
        if (get_called_class() == 'Drmer\Mqtt\Client\BaseClient') {
            throw new \RuntimeException("Could not instance from BaseClient");
        }
        return new static(new Version4());
    }

    public function connect($host, $port, $opts=[])
    {
        $this->connectOptions = new ConnectionOptions($opts);

        $this->on('start', [$this, 'onStart']);

        return $this->socketOpen($host, $port);
    }

    public function onStart()
    {
        $packet = new Connect($this->connectOptions);
        $this->sendPacket($packet);
    }

    protected function sendPacket(ControlPacket $packet)
    {
        if ($this->debug) {
            echo "send:\t\t" . get_class($packet);
            $packet->debugPrint();
        }
        try {
            $this->socketSend($packet->get());
        } catch (\Exception $e) {
            if ($this->debug) {
                echo $e->getMessage() . "\n";
            }
        }
    }

    public function publish($topic, $message, $qos = 1, $dup = false, $retain = false)
    {
        $packet = new Publish($this->version);
        $packet->setTopic($topic);
        $packet->setIdentifier($this->messageCounter++);
        $packet->setQos($qos);
        $packet->setDup($dup);
        $packet->setRetain($retain);
        $packet->setPayload($message);
        $this->sendPacket($packet);
    }

    public function subscribe($topic, $qos = 0)
    {
        $packet = new Subscribe();
        $packet->addSubscription($topic, $qos);
        $packet->setIdentifier($this->messageCounter++);
        return $this->sendPacket($packet);
    }

    public function unsubscribe($topic)
    {
        $packet = new Unsubscribe();
        $packet->removeSubscription($topic);
        return $this->sendPacket($packet);
    }

    public function disconnect()
    {
        $packet = new Disconnect();
        return $this->sendPacket($packet);
    }

    public function close()
    {
        $this->socketClose();
    }

    protected function onReceive($data)
    {
        $packet = Parser::parse($data);
        if ($packet == null) {
            return;
        }
        $controlType = ord($data{0}) >> 4;
        if ($this->debug) {
            $cmd = Parser::getCmd($controlType);
            echo "receive data ($cmd): ";
            $packet->debugPrint();
        }

        switch ($controlType) {
            case ControlPacketType::CONNACK:
                $this->onConnected($packet);
                break;
            case ControlPacketType::SUBACK:
                $this->onSubscribeAck($packet);
                break;
            case ControlPacketType::PUBLISH:
                $this->onMessage($packet);
                break;
            case ControlPacketType::PUBACK:
                $this->onPublichAck($packet);
                break;
            case ControlPacketType::PUBREC:
                $this->onPublishReceived($packet);
                break;
            case ControlPacketType::PUBREL:
                $this->onPublishReleased($packet);
                break;
            case ControlPacketType::PUBCOMP:
                $this->onPublishComplete($packet);
                break;
            default:
                break;
        }
    }

    public function onConnected($packet)
    {
        if (($keepAlive = $this->connectOptions->keepAlive) > 0) {
            $this->timerTick($keepAlive / 2, function () {
                $this->sendPacket(new PingRequest());
            });
        }
        $this->emit('connected', $packet);
    }

    public function onSubscribeAck($packet)
    {
    }

    public function onMessage($packet)
    {
        $this->emit('message', $packet);

        $receivedPacket = new PublishReceived();
        $receivedPacket->setIdentifier($packet->getIdentifier());
        $this->sendPacket($receivedPacket);
    }

    public function onPublichAck($packet)
    {
    }

    protected function onPublishReceived($packet)
    {
        $releasePacket = new PublishRelease();
        $releasePacket->setIdentifier($packet->getIdentifier());
        $this->sendPacket($releasePacket);
    }


    public function onPublishReleased($packet)
    {
        $completePacket = new PublishComplete();
        $completePacket->setIdentifier($packet->getIdentifier());
        $this->sendPacket($completePacket);
    }

    protected function onPublishComplete($packet)
    {
        if ($this->debug) {
            echo "public complete \n";
        }
    }
}