pixelfederation/swoole-bundle

View on GitHub
src/Bridge/Doctrine/DoctrineProcessor.php

Summary

Maintainability
A
2 hrs
Test Coverage
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Doctrine;

use K911\Swoole\Bridge\Doctrine\DBAL\ConnectionKeepAliveResetter;
use K911\Swoole\Bridge\Doctrine\ORM\EntityManagerResetter;
use K911\Swoole\Bridge\Symfony\Bundle\DependencyInjection\CompilerPass\StatefulServices\CompileProcessor;
use K911\Swoole\Bridge\Symfony\Bundle\DependencyInjection\CompilerPass\StatefulServices\Proxifier;
use K911\Swoole\Bridge\Symfony\Bundle\DependencyInjection\ContainerConstants;
use PixelFederation\DoctrineResettableEmBundle\DBAL\Connection\DBALPlatformAliveKeeper;
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Reference;

final class DoctrineProcessor implements CompileProcessor
{
    /**
     * @param array{global_limit?: int, limits?: array<string, int>} $config
     */
    public function __construct(private array $config = [])
    {
    }

    public function process(ContainerBuilder $container, Proxifier $proxifier): void
    {
        /** @var array<string,string> $bundles */
        $bundles = $container->getParameter('kernel.bundles');

        if (!isset($bundles['DoctrineBundle'])) {
            return;
        }

        $doctrineDef = $container->findDefinition('doctrine');
        $doctrineDef->addTag(ContainerConstants::TAG_SAFE_STATEFUL_SERVICE);

        $entityManagers = $container->getParameter('doctrine.entity_managers');

        if (!\is_array($entityManagers)) {
            throw new \UnexpectedValueException('Cannot obtain array of entity managers.');
        }

        $connectionSvcIds = $container->getParameter('doctrine.connections');

        if (!\is_array($connectionSvcIds)) {
            throw new \UnexpectedValueException('Cannot obtain array of doctrine connections.');
        }

        $this->createEntityManagerResetterDefinition($container);
        $this->prepareConnectionsForProxification($container, $connectionSvcIds);

        foreach ($entityManagers as $emName => $emSvcId) {
            $emDef = $container->findDefinition($emSvcId);
            $emDef->setLazy(false); // no need for another level of proxy wihich is technically lazy itself
            $tagParams = ['resetter' => EntityManagerResetter::class];
            $limit = $this->getLimitFromEntityManagerConnection($container, $emDef);

            if (null !== $limit) {
                $tagParams['limit'] = $limit;
            }

            $emDef->addTag(ContainerConstants::TAG_STATEFUL_SERVICE, $tagParams);
            $this->overrideEmConfigurator($container, $emDef);
            $this->decorateRepositoryFactory($container, $emName, $emSvcId);
        }

        $this->fixDebugDataHolderResetter($container, $proxifier);
    }

    private function createEntityManagerResetterDefinition(ContainerBuilder $container): void
    {
        $resetterDef = new Definition(EntityManagerResetter::class);
        $resetterDef->setClass(EntityManagerResetter::class);
        $container->setDefinition(EntityManagerResetter::class, $resetterDef);
    }

    private function overrideEmConfigurator(ContainerBuilder $container, Definition $emDef): void
    {
        $configuratorCallback = $emDef->getConfigurator();
        /** @var Reference $configuratorRef */
        $configuratorRef = $configuratorCallback[0];
        $newConfiguratorDefSvcId = sprintf('%s.swoole_coop.blocking', (string) $configuratorRef);
        $newConfiguratorDef = new Definition(BlockingProxyFactoryOverridingManagerConfigurator::class);
        $newConfiguratorDef->setArgument(0, $configuratorRef);
        $newConfiguratorDef->setArgument(1, new Reference('swoole_bundle.unmanaged_factory_first_time.locking'));
        $container->setDefinition($newConfiguratorDefSvcId, $newConfiguratorDef);
        $emDef->setConfigurator([new Reference($newConfiguratorDefSvcId), 'configure']);
    }

    private function prepareConnectionsForProxification(ContainerBuilder $container, array $connectionSvcIds): void
    {
        $dbalAliveKeeperDef = $container->findDefinition(DBALPlatformAliveKeeper::class);
        $aliveKeepers = $dbalAliveKeeperDef->getArgument(1);
        $dbalAliveKeeperDef->setArgument(1, []);

        foreach ($connectionSvcIds as $connectionName => $connectionSvcId) {
            $limit = $this->getConnectionLimit($connectionName);

            if (!$limit) {
                $limit = $this->getGlobalConnectionLimit();
            }

            $connectionDef = $container->findDefinition($connectionSvcId);
            $tagParams = [];

            if ($limit) {
                $tagParams['limit'] = $limit;
            }

            if (isset($aliveKeepers[$connectionName])) {
                $tagParams['resetter'] = $this->tryToCreateKeepAliveResetter(
                    $container,
                    $connectionName,
                    $aliveKeepers[$connectionName]
                );
            }

            $connectionDef->addTag(ContainerConstants::TAG_STATEFUL_SERVICE, $tagParams);
        }
    }

    private function getLimitFromEntityManagerConnection(ContainerBuilder $container, Definition $emDef): ?int
    {
        /** @vat Reference $connRef */
        $connRef = $emDef->getArgument(0);
        $connDef = $container->findDefinition((string) $connRef);
        $statefulSvcTag = $connDef->getTag(ContainerConstants::TAG_STATEFUL_SERVICE);

        return $statefulSvcTag && isset($statefulSvcTag[0]['limit']) ? $statefulSvcTag[0]['limit'] : null;
    }

    private function fixDebugDataHolderResetter(ContainerBuilder $container, Proxifier $proxifier): void
    {
        if (!$container->has('doctrine.debug_data_holder')) {
            return;
        }

        $proxifier->proxifyService('doctrine.debug_data_holder');
        $resetterDef = $container->findDefinition('services_resetter');

        if ($resetterDef->hasTag('kernel.reset')) {
            return;
        }

        /** @var IteratorArgument $resetters */
        $resetters = $resetterDef->getArgument(0);
        $resetterValues = $resetters->getValues();
        $resetterValues['doctrine.debug_data_holder'] = new Reference('doctrine.debug_data_holder');
        $resetters->setValues($resetterValues);
        $resetMethods = $resetterDef->getArgument(1);
        $resetMethods['doctrine.debug_data_holder'] = ['reset'];
        $resetterDef->setArgument(1, $resetMethods);
    }

    private function decorateRepositoryFactory(ContainerBuilder $container, string $emName, string $emSvcId): void
    {
        $configuratorSvcId = sprintf('doctrine.orm.%s_configuration', $emName);
        $configuratorDef = $container->findDefinition($configuratorSvcId);

        $newRepoFactorySvcId = sprintf('%s.%s', ServicePooledRepositoryFactory::class, $emName);
        $repoFactoryDef = new Definition(ServicePooledRepositoryFactory::class);
        $container->setDefinition($newRepoFactorySvcId, $repoFactoryDef);

        $methodCalls = $configuratorDef->getMethodCalls();

        foreach ($methodCalls as $index => $methodCall) {
            if ('setRepositoryFactory' !== $methodCall[0]) {
                continue;
            }

            $originalFactorySvcId = (string) $methodCall[1][0];
            $repoFactoryDef->setArgument(0, new Reference($originalFactorySvcId));
            $repoFactoryDef->setArgument(1, new Reference($emSvcId));
            $methodCall[1] = [0 => new Reference($newRepoFactorySvcId)];
            $methodCalls[$index] = $methodCall;

            break;
        }
        $configuratorDef->setMethodCalls($methodCalls);
    }

    private function getGlobalConnectionLimit(): ?int
    {
        if (!isset($this->config['global_limit'])) {
            return null;
        }

        return $this->config['global_limit'];
    }

    private function getConnectionLimit(string $connectionName): ?int
    {
        if (!isset($this->config['limits'])) {
            return null;
        }

        if (!isset($this->config['limits'][$connectionName])) {
            return null;
        }

        return (int) $this->config['limits'][$connectionName];
    }

    private function tryToCreateKeepAliveResetter(
        ContainerBuilder $container,
        string $connectionName,
        Reference $aliveKeeperRef
    ): string {
        $resetterSvcId = sprintf(
            'swoole_bundle.coroutines_support.doctrine.connection_resetter.%s',
            $connectionName
        );
        $resetterDef = new Definition();
        $resetterDef->setClass(ConnectionKeepAliveResetter::class);
        $resetterDef->setArgument(0, $aliveKeeperRef);
        $resetterDef->setArgument(1, $connectionName);
        $container->setDefinition($resetterSvcId, $resetterDef);

        return $resetterSvcId;
    }
}