KonstantinCodes/messenger-kafka

View on GitHub
src/Messenger/RestProxyTransport.php

Summary

Maintainability
A
1 hr
Test Coverage
F
0%
<?php

declare(strict_types=1);

namespace Koco\Kafka\Messenger;

use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestFactoryInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Psr\Http\Message\UriFactoryInterface;
use Psr\Http\Message\UriInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class RestProxyTransport implements TransportInterface, MessageCountAwareInterface
{
    /** @var UriInterface */
    private $baseUri;

    /** @var string */
    private $topicName;

    /** @var SerializerInterface */
    private $serializer;

    /** @var ClientInterface */
    private $client;

    /** @var RequestFactoryInterface */
    private $requestFactory;

    /** @var UriFactoryInterface */
    private $uriFactory;

    /** @var StreamFactoryInterface */
    private $streamFactory;

    /** @var LoggerInterface|null */
    private $logger;

    private $receiver;

    /** @var RestProxySender */
    private $sender;

    public function __construct(
        UriInterface $baseUri,
        string $topicName,
        SerializerInterface $serializer,
        ClientInterface $client,
        RequestFactoryInterface $requestFactory,
        UriFactoryInterface $uriFactory,
        StreamFactoryInterface $streamFactory,
        ?LoggerInterface $logger = null
    ) {
        $this->baseUri = $baseUri;
        $this->topicName = $topicName;
        $this->serializer = $serializer;
        $this->logger = $logger;
        $this->client = $client;
        $this->requestFactory = $requestFactory;
        $this->uriFactory = $uriFactory;
        $this->streamFactory = $streamFactory;
    }

    /**
     * {@inheritdoc}
     */
    public function get(): iterable
    {
        throw new \Exception('Not implemented!');
    }

    /**
     * {@inheritdoc}
     */
    public function ack(Envelope $envelope): void
    {
        throw new \Exception('Not implemented!');
    }

    /**
     * {@inheritdoc}
     */
    public function reject(Envelope $envelope): void
    {
        throw new \Exception('Not implemented!');
    }

    /**
     * {@inheritdoc}
     */
    public function send(Envelope $envelope): Envelope
    {
        return ($this->sender ?? $this->getSender())->send($envelope);
    }

    /**
     * {@inheritdoc}
     */
    public function getMessageCount(): int
    {
        throw new \Exception('Not implemented!');
    }

    private function getSender(): RestProxySender
    {
        return $this->sender = new RestProxySender(
            $this->baseUri,
            $this->topicName,
            $this->serializer,
            $this->client,
            $this->requestFactory,
            $this->uriFactory,
            $this->streamFactory
        );
    }
}