idealo/php-rdkafka-ffi

View on GitHub
src/RdKafka/KafkaConsumer.php

Summary

Maintainability
B
6 hrs
Test Coverage
B
89%
<?php

declare(strict_types=1);

namespace RdKafka;

use FFI;
use InvalidArgumentException;
use RdKafka;
use RdKafka\FFI\Library;
use TypeError;
use function count;
use function is_array;
use function is_int;
use function is_string;
use function sprintf;

class KafkaConsumer extends RdKafka
{
    private bool $closed;

    public function __construct(Conf $conf)
    {
        try {
            $conf->get('group.id');
        } catch (Exception $exception) {
            throw new Exception('"group.id" must be configured.', $exception->getCode(), $exception);
        }

        $this->closed = false;

        parent::__construct(RD_KAFKA_CONSUMER, $conf);

        Library::rd_kafka_poll_set_consumer($this->kafka);
    }

    public function __destruct()
    {
        $this->close();

        parent::__destruct();
    }

    public function close(): void
    {
        if ($this->closed) {
            return;
        }

        $err = (int) Library::rd_kafka_consumer_close($this->kafka);

        if ($err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
            trigger_error(sprintf('rd_kafka_consumer_close failed: %s', rd_kafka_err2str($err)), E_USER_WARNING);
        }

        $this->closed = true;
    }

    /**
     * @param TopicPartition[] $topic_partitions
     *
     * @throws Exception
     */
    public function assign(?array $topic_partitions = null): void
    {
        $nativeTopicPartitionList = null;

        if ($topic_partitions !== null) {
            $topicPartitionList = new TopicPartitionList(...$topic_partitions);
            $nativeTopicPartitionList = $topicPartitionList->getCData();
        }

        $err = Library::rd_kafka_assign($this->kafka, $nativeTopicPartitionList);

        if ($nativeTopicPartitionList !== null) {
            Library::rd_kafka_topic_partition_list_destroy($nativeTopicPartitionList);
        }

        if ($err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
            throw Exception::fromError($err);
        }
    }

    /**
     * @return TopicPartition[]
     */
    public function getAssignment(): array
    {
        $nativeTopicPartitionList = Library::rd_kafka_topic_partition_list_new(0);

        Library::rd_kafka_assignment($this->kafka, FFI::addr($nativeTopicPartitionList));

        $topicPartitionList = TopicPartitionList::fromCData($nativeTopicPartitionList);

        Library::rd_kafka_topic_partition_list_destroy($nativeTopicPartitionList);

        return $topicPartitionList->asArray();
    }

    /**
     * @param Message|TopicPartition[]|null $message_or_offsets
     *
     * @throws Exception
     */
    public function commit($message_or_offsets = null): void
    {
        try {
            $topicPartitionList = $this->createTopicPartitionList($message_or_offsets);
        } catch (TypeError $exception) {
            throw new InvalidArgumentException(
                sprintf(
                    '%s expects parameter %d to be %s, %s given',
                    __METHOD__,
                    1,
                    'an instance of RdKafka\\Message, an array of RdKafka\\TopicPartition or null',
                    gettype($message_or_offsets)
                ),
                $exception->getCode(),
                $exception
            );
        }
        $this->commitInternal($topicPartitionList, false);
    }

    /**
     * @param Message|TopicPartition[]|null $message_or_offsets
     *
     * @throws Exception
     */
    public function commitAsync($message_or_offsets = null): void
    {
        try {
            $topicPartitionList = $this->createTopicPartitionList($message_or_offsets);
        } catch (TypeError $exception) {
            throw new InvalidArgumentException(
                sprintf(
                    '%s expects parameter %d to be %s, %s given',
                    __METHOD__,
                    1,
                    'an instance of RdKafka\\Message, an array of RdKafka\\TopicPartition or null',
                    gettype($message_or_offsets)
                ),
                $exception->getCode(),
                $exception
            );
        }
        $this->commitInternal($topicPartitionList, true);
    }

    private function commitInternal(?TopicPartitionList $topicPartitionList, bool $isAsync): void
    {
        $nativeTopicPartitionList = null;
        if ($topicPartitionList !== null) {
            $nativeTopicPartitionList = $topicPartitionList->getCData();
        }

        $err = Library::rd_kafka_commit($this->kafka, $nativeTopicPartitionList, $isAsync ? 1 : 0);

        if ($nativeTopicPartitionList !== null) {
            Library::rd_kafka_topic_partition_list_destroy($nativeTopicPartitionList);
        }

        if ($err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
            throw Exception::fromError($err);
        }
    }

    private function createTopicPartitionList($message_or_offsets): ?TopicPartitionList
    {
        if ($message_or_offsets === null) {
            return null;
        }
        if ($message_or_offsets instanceof Message) {
            return $this->createTopicPartitionListFromMessage($message_or_offsets);
        }
        if (is_array($message_or_offsets) === true) {
            return new TopicPartitionList(...$message_or_offsets);
        }

        throw new TypeError(
            sprintf(
                'Argument 1 passed to %s must be an instance of RdKafka\\Message, an array of RdKafka\\TopicPartition or null',
                __METHOD__
            )
        );
    }

    private function createTopicPartitionListFromMessage(Message $message): TopicPartitionList
    {
        if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
            throw new Exception('Invalid argument: Specified Message has an error');
        }
        if (is_string($message->topic_name) === false) {
            throw new Exception('Invalid argument: Specified Message\'s topic_name is not a string');
        }
        if (is_int($message->partition) === false) {
            throw new Exception('Invalid argument: Specified Message\'s partition is not an int');
        }
        if (is_int($message->offset) === false) {
            throw new Exception('Invalid argument: Specified Message\'s offset is not an int');
        }

        return new TopicPartitionList(
            new TopicPartition($message->topic_name, $message->partition, $message->offset + 1)
        );
    }

    /**
     * @throws InvalidArgumentException
     */
    public function consume(int $timeout_ms): Message
    {
        $nativeMessage = Library::rd_kafka_consumer_poll($this->kafka, $timeout_ms);

        if ($nativeMessage === null) {
            $nativeMessage = Library::new('rd_kafka_message_t');
            $nativeMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;

            $message = new Message(FFI::addr($nativeMessage));
        } else {
            $message = new Message($nativeMessage);

            Library::rd_kafka_message_destroy($nativeMessage);
        }

        return $message;
    }

    /**
     * @throws Exception
     */
    public function subscribe(array $topics): void
    {
        $nativeTopicPartitionList = Library::rd_kafka_topic_partition_list_new(count($topics));

        foreach ($topics as $topic) {
            Library::rd_kafka_topic_partition_list_add(
                $nativeTopicPartitionList,
                (string) $topic,
                RD_KAFKA_PARTITION_UA
            );
        }

        $err = Library::rd_kafka_subscribe($this->kafka, $nativeTopicPartitionList);

        Library::rd_kafka_topic_partition_list_destroy($nativeTopicPartitionList);

        if ($err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
            throw Exception::fromError($err);
        }
    }

    /**
     * @throws Exception
     */
    public function unsubscribe(): void
    {
        $err = Library::rd_kafka_unsubscribe($this->kafka);

        if ($err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
            throw Exception::fromError($err);
        }
    }

    /**
     * @throws Exception
     */
    public function getSubscription(): array
    {
        $nativeTopicPartitionList = Library::rd_kafka_topic_partition_list_new(0);

        $err = Library::rd_kafka_subscription($this->kafka, FFI::addr($nativeTopicPartitionList));

        if ($err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
            Library::rd_kafka_topic_partition_list_destroy($nativeTopicPartitionList);
            throw Exception::fromError($err);
        }

        $topicPartitionList = TopicPartitionList::fromCData($nativeTopicPartitionList);

        Library::rd_kafka_topic_partition_list_destroy($nativeTopicPartitionList);

        $topics = [];
        foreach ($topicPartitionList as $topicPartition) {
            $topics[] = $topicPartition->getTopic();
        }

        return $topics;
    }

    /**
     * @throws \Exception
     */
    public function newTopic(string $topic_name, ?TopicConf $topic_conf = null): KafkaConsumerTopic
    {
        return new KafkaConsumerTopic($this, $topic_name, $topic_conf);
    }

    /**
     * @param TopicPartition[] $topics
     * @return TopicPartition[]
     * @throws Exception
     */
    public function getCommittedOffsets(array $topics, int $timeout_ms): array
    {
        $topicPartitionList = new TopicPartitionList(...$topics);
        $nativeTopicPartitionList = $topicPartitionList->getCData();

        $err = Library::rd_kafka_committed($this->kafka, $nativeTopicPartitionList, $timeout_ms);

        if ($err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
            Library::rd_kafka_topic_partition_list_destroy($nativeTopicPartitionList);
            throw Exception::fromError($err);
        }

        $topicPartitionList = TopicPartitionList::fromCData($nativeTopicPartitionList);

        if ($nativeTopicPartitionList !== null) {
            Library::rd_kafka_topic_partition_list_destroy($nativeTopicPartitionList);
        }

        return $topicPartitionList->asArray();
    }

    /**
     * @param TopicPartition[] $topicPartitions
     * @return TopicPartition[]
     * @throws Exception
     */
    public function offsetsForTimes(array $topicPartitions, int $timeout_ms): array
    {
        $topicPartitionList = new TopicPartitionList(...$topicPartitions);
        $nativeTopicPartitionList = $topicPartitionList->getCData();

        $err = Library::rd_kafka_offsets_for_times($this->kafka, $nativeTopicPartitionList, $timeout_ms);

        if ($err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
            Library::rd_kafka_topic_partition_list_destroy($nativeTopicPartitionList);
            throw Exception::fromError($err);
        }

        $topicPartitionList = TopicPartitionList::fromCData($nativeTopicPartitionList);

        if ($nativeTopicPartitionList !== null) {
            Library::rd_kafka_topic_partition_list_destroy($nativeTopicPartitionList);
        }

        return $topicPartitionList->asArray();
    }
}