idealo/php-rdkafka-ffi

View on GitHub
src/RdKafka/Admin/NewTopic.php

Summary

Maintainability
C
1 day
Test Coverage
B
84%
<?php

declare(strict_types=1);

namespace RdKafka\Admin;

use FFI;
use FFI\CData;
use RdKafka\Exception;
use RdKafka\FFI\Library;
use function array_values;
use function count;

class NewTopic
{
    private ?CData $topic;

    public function __construct(string $name, int $num_partitions, int $replication_factor)
    {
        $errstr = Library::new('char[512]');
        $this->topic = Library::rd_kafka_NewTopic_new(
            $name,
            $num_partitions,
            $replication_factor,
            $errstr,
            FFI::sizeOf($errstr)
        );

        if ($this->topic === null) {
            throw new Exception(FFI::string($errstr));
        }
    }

    public function __destruct()
    {
        if ($this->topic === null) {
            return;
        }

        Library::rd_kafka_NewTopic_destroy($this->topic);
    }

    public function getCData(): CData
    {
        return $this->topic;
    }

    public function setReplicaAssignment(int $partition_id, array $broker_ids): void
    {
        if (empty($broker_ids) === true) {
            throw new \InvalidArgumentException('broker_ids array must not be empty');
        }

        foreach ($broker_ids as $key => $value) {
            if (is_int($value) === false) {
                throw new \InvalidArgumentException(
                    sprintf('broker_ids array element %s must be int', $key)
                );
            }
        }

        $brokerIdsCount = count($broker_ids);
        $brokerIds = Library::new('int*[' . $brokerIdsCount . ']');
        foreach (array_values($broker_ids) as $i => $broker_id) {
            $int = Library::new('int');
            $int->cdata = $broker_id;
            $brokerIds[$i] = FFI::addr($int);
        }

        $errstr = Library::new('char[512]');
        $err = (int) Library::rd_kafka_NewTopic_set_replica_assignment(
            $this->topic,
            $partition_id,
            $brokerIds[0],
            $brokerIdsCount,
            $errstr,
            FFI::sizeOf($errstr)
        );

        if ($err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
            throw new Exception(FFI::string($errstr));
        }
    }

    public function setConfig(string $name, string $value): void
    {
        $err = Library::rd_kafka_NewTopic_set_config($this->topic, $name, $value);

        if ($err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
            throw Exception::fromError($err);
        }
    }
}