src/Merger/SchemaMerger.php
<?php
declare(strict_types=1);
namespace PhpKafka\PhpAvroSchemaGenerator\Merger;
use AvroSchema;
use AvroSchemaParseException;
use PhpKafka\PhpAvroSchemaGenerator\Avro\Avro;
use PhpKafka\PhpAvroSchemaGenerator\Exception\SchemaMergerException;
use PhpKafka\PhpAvroSchemaGenerator\Optimizer\OptimizerInterface;
use PhpKafka\PhpAvroSchemaGenerator\Registry\SchemaRegistryInterface;
use PhpKafka\PhpAvroSchemaGenerator\Schema\SchemaTemplateInterface;
use RuntimeException;
final class SchemaMerger implements SchemaMergerInterface
{
private string $outputDirectory;
private ?SchemaRegistryInterface $schemaRegistry;
/**
* @var OptimizerInterface[]
*/
private array $optimizers = [];
public function __construct(string $outputDirectory = '/tmp')
{
$this->outputDirectory = $outputDirectory;
}
/**
* @return SchemaRegistryInterface|null
*/
public function getSchemaRegistry(): ?SchemaRegistryInterface
{
return $this->schemaRegistry;
}
/**
* @param SchemaRegistryInterface $schemaRegistry
*/
public function setSchemaRegistry(SchemaRegistryInterface $schemaRegistry): void
{
$this->schemaRegistry = $schemaRegistry;
}
/**
* @return string
*/
public function getOutputDirectory(): string
{
return $this->outputDirectory;
}
/**
* @param string $outputDirectory
*/
public function setOutputDirectory(string $outputDirectory): void
{
$this->outputDirectory = $outputDirectory;
}
/**
* @param SchemaTemplateInterface $rootSchemaTemplate
* @return SchemaTemplateInterface
* @throws AvroSchemaParseException
* @throws SchemaMergerException
*/
public function getResolvedSchemaTemplate(SchemaTemplateInterface $rootSchemaTemplate): SchemaTemplateInterface
{
if (null === $this->getSchemaRegistry()) {
throw new RuntimeException('Please set a SchemaRegistery for the merger');
}
$rootDefinition = $rootSchemaTemplate->getSchemaDefinition();
do {
$exceptionThrown = false;
try {
AvroSchema::parse($rootDefinition);
} catch (AvroSchemaParseException $e) {
if (false === strpos($e->getMessage(), ' is not a schema we know about.')) {
throw $e;
}
$exceptionThrown = true;
$schemaId = $this->getSchemaIdFromExceptionMessage($e->getMessage());
$embeddedTemplate = $this->getSchemaRegistry()->getSchemaById($schemaId);
if (null === $embeddedTemplate) {
throw new SchemaMergerException(
sprintf(SchemaMergerException::UNKNOWN_SCHEMA_TYPE_EXCEPTION_MESSAGE, $schemaId)
);
}
$rootDefinition = $this->replaceSchemaIdWithDefinition(
$rootDefinition,
$schemaId,
$embeddedTemplate->getSchemaDefinition()
);
}
} while (true === $exceptionThrown);
return $rootSchemaTemplate->withSchemaDefinition($rootDefinition);
}
private function getSchemaIdFromExceptionMessage(string $exceptionMessage): string
{
return str_replace(' is not a schema we know about.', '', $exceptionMessage);
}
private function replaceSchemaIdWithDefinition(
string $rootDefinition,
string $schemaId,
string $embeddedDefinition
): string {
$idString = '"' . $schemaId . '"';
$pos = (int) strpos($rootDefinition, $idString);
$embeddedDefinitionWithoutLevel = $this->removeSchemaLevel($embeddedDefinition);
return substr_replace($rootDefinition, $embeddedDefinitionWithoutLevel, $pos, strlen($idString));
}
private function removeSchemaLevel(string $embeddedDefinition): string
{
$arraySchema = json_decode($embeddedDefinition, true);
unset($arraySchema['schema_level']);
return json_encode($arraySchema, JSON_THROW_ON_ERROR | JSON_PRESERVE_ZERO_FRACTION);
}
/**
* @param bool $prefixWithNamespace
* @param bool $useTemplateName
* @return integer
* @throws AvroSchemaParseException
* @throws SchemaMergerException
*/
public function merge(
bool $prefixWithNamespace = false,
bool $useTemplateName = false
): int {
$mergedFiles = 0;
$registry = $this->getSchemaRegistry();
if (null === $registry) {
throw new RuntimeException('Please set a SchemaRegistery for the merger');
}
/** @var SchemaTemplateInterface $rootSchemaTemplate */
foreach ($registry->getRootSchemas() as $rootSchemaTemplate) {
try {
$resolvedTemplate = $this->getResolvedSchemaTemplate($rootSchemaTemplate);
foreach ($this->optimizers as $optimizer) {
$resolvedTemplate = $optimizer->optimize($resolvedTemplate);
}
} catch (SchemaMergerException $e) {
throw $e;
}
$this->exportSchema($resolvedTemplate, $prefixWithNamespace, $useTemplateName);
++$mergedFiles;
}
return $mergedFiles;
}
/**
* @param SchemaTemplateInterface $rootSchemaTemplate
* @param boolean $prefixWithNamespace
* @param boolean $useTemplateName
* @return void
*/
public function exportSchema(
SchemaTemplateInterface $rootSchemaTemplate,
bool $prefixWithNamespace = false,
bool $useTemplateName = false
): void {
$rootSchemaDefinition = $this->transformExportSchemaDefinition(
json_decode($rootSchemaTemplate->getSchemaDefinition(), true, JSON_THROW_ON_ERROR)
);
$prefix = '';
if (true === $prefixWithNamespace && false === $rootSchemaTemplate->isPrimitive()) {
$prefix = $rootSchemaDefinition['namespace'] . '.';
}
$schemaFilename = $rootSchemaTemplate->getFilename();
if (false === $useTemplateName && false === $rootSchemaTemplate->isPrimitive()) {
$schemaFilename = $prefix . $rootSchemaDefinition['name'] . '.' . Avro::FILE_EXTENSION;
}
if (false === file_exists($this->getOutputDirectory())) {
mkdir($this->getOutputDirectory());
}
/** @var string $fileContents */
$fileContents = json_encode($rootSchemaDefinition, JSON_PRESERVE_ZERO_FRACTION);
file_put_contents($this->getOutputDirectory() . '/' . $schemaFilename, $fileContents);
}
/**
* @param mixed $schemaDefinition
* @return mixed
*/
private function transformExportSchemaDefinition($schemaDefinition)
{
if (is_array($schemaDefinition)) {
unset($schemaDefinition['schema_level']);
}
return $schemaDefinition;
}
/**
* @param OptimizerInterface $optimizer
*/
public function addOptimizer(OptimizerInterface $optimizer): void
{
$this->optimizers[] = $optimizer;
}
}