honeybee/honeybee

View on GitHub
src/Projection/EventHandler/ProjectionUpdater.php

Summary

Maintainability
D
1 day
Test Coverage
<?php

namespace Honeybee\Projection\EventHandler;

use Honeybee\Common\Error\RuntimeError;
use Honeybee\EntityTypeInterface;
use Honeybee\Infrastructure\Config\ConfigInterface;
use Honeybee\Infrastructure\DataAccess\DataAccessServiceInterface;
use Honeybee\Infrastructure\DataAccess\Query\AttributeCriteria;
use Honeybee\Infrastructure\DataAccess\Query\Comparison\Equals;
use Honeybee\Infrastructure\DataAccess\Query\CriteriaList;
use Honeybee\Infrastructure\DataAccess\Query\CriteriaQuery;
use Honeybee\Infrastructure\Event\Bus\Channel\ChannelMap;
use Honeybee\Infrastructure\Event\Bus\EventBusInterface;
use Honeybee\Infrastructure\Event\EventHandler;
use Honeybee\Infrastructure\Event\EventInterface;
use Honeybee\Model\Aggregate\AggregateRootTypeMap;
use Honeybee\Model\Event\AggregateRootEventInterface;
use Honeybee\Model\Event\EmbeddedEntityEventInterface;
use Honeybee\Model\Event\EmbeddedEntityEventList;
use Honeybee\Model\Task\CreateAggregateRoot\AggregateRootCreatedEvent;
use Honeybee\Model\Task\ModifyAggregateRoot\AddEmbeddedEntity\EmbeddedEntityAddedEvent;
use Honeybee\Model\Task\ModifyAggregateRoot\AggregateRootModifiedEvent;
use Honeybee\Model\Task\ModifyAggregateRoot\ModifyEmbeddedEntity\EmbeddedEntityModifiedEvent;
use Honeybee\Model\Task\ModifyAggregateRoot\RemoveEmbeddedEntity\EmbeddedEntityRemovedEvent;
use Honeybee\Model\Task\MoveAggregateRootNode\AggregateRootNodeMovedEvent;
use Honeybee\Model\Task\ProceedWorkflow\WorkflowProceededEvent;
use Honeybee\Projection\Event\ProjectionCreatedEvent;
use Honeybee\Projection\Event\ProjectionUpdatedEvent;
use Honeybee\Projection\ProjectionInterface;
use Honeybee\Projection\ProjectionMap;
use Honeybee\Projection\ProjectionTypeInterface;
use Honeybee\Projection\ProjectionTypeMap;
use Psr\Log\LoggerInterface;
use Ramsey\Uuid\Uuid;
use Trellis\Runtime\Attribute\AttributeInterface;
use Trellis\Runtime\Entity\EntityInterface;
use Trellis\Runtime\Entity\EntityReferenceInterface;
use Trellis\Runtime\ReferencedEntityTypeInterface;

class ProjectionUpdater extends EventHandler
{
    protected $data_access_service;

    protected $projection_type_map;

    protected $aggregate_root_type_map;

    protected $event_bus;

    public function __construct(
        ConfigInterface $config,
        LoggerInterface $logger,
        DataAccessServiceInterface $data_access_service,
        ProjectionTypeMap $projection_type_map,
        AggregateRootTypeMap $aggregate_root_type_map,
        EventBusInterface $event_bus
    ) {
        parent::__construct($config, $logger);

        $this->data_access_service = $data_access_service;
        $this->projection_type_map = $projection_type_map;
        $this->aggregate_root_type_map = $aggregate_root_type_map;
        $this->event_bus = $event_bus;
    }

    public function handleEvent(EventInterface $event)
    {
        $affected_projections = $this->invokeEventHandler($event, 'on');

        // store updates and distribute projection update events
        $projection_type = $this->getProjectionType($event);
        $this->getStorageWriter($projection_type)->writeMany($affected_projections);

        foreach ($affected_projections as $affected_projection) {
            $projection_event_state = [
                'uuid' => Uuid::uuid4()->toString(),
                'projection_identifier' => $affected_projection->getIdentifier(),
                'projection_type' => $affected_projection->getType()->getVariantPrefix(),
                'data' => $affected_projection->toArray()
            ];

            $projection_event = $event instanceof AggregateRootCreatedEvent
                ? new ProjectionCreatedEvent($projection_event_state)
                : new ProjectionUpdatedEvent($projection_event_state);

            $this->event_bus->distribute(ChannelMap::CHANNEL_INFRA, $projection_event);
        }

        // call any dependent handlers
        foreach ($affected_projections as $affected_projection) {
            $this->invokeEventHandler($event, 'after', [ $affected_projection ]);
        }

        return $affected_projections->toList()->getFirst();
    }

    protected function onAggregateRootCreated(AggregateRootCreatedEvent $event)
    {
        $projection_data = $event->getData();
        $projection_data['identifier'] = $event->getAggregateRootIdentifier();
        $projection_data['revision'] = $event->getSeqNumber();
        $projection_data['created_at'] = $event->getDateTime();
        $projection_data['modified_at'] = $event->getDateTime();
        $projection_data['metadata'] = $event->getMetaData();

        $projection_type = $this->getProjectionType($event);

        if ($projection_type->isHierarchical()) {
            $parent_projection = null;
            if (isset($projection_data['parent_node_id'])) {
                $parent_projection = $this->loadProjection($projection_type, $projection_data['parent_node_id']);
            }
            $projection_data['materialized_path'] = $this->calculateMaterializedPath($parent_projection);
        }

        $new_projection = $projection_type->createEntity($projection_data);
        $this->handleEmbeddedEntityEvents($new_projection, $event->getEmbeddedEntityEvents());

        return new ProjectionMap([ $new_projection ]);
    }

    protected function onAggregateRootModified(AggregateRootModifiedEvent $event)
    {
        $projection_type = $this->getProjectionType($event);
        $updated_data = $this->loadProjection($projection_type, $event->getAggregateRootIdentifier())->toArray();

        foreach ($event->getData() as $attribute_name => $new_value) {
            $updated_data[$attribute_name] = $new_value;
        }
        $updated_data['revision'] = $event->getSeqNumber();
        $updated_data['modified_at'] = $event->getDateTime();
        $updated_data['metadata'] = array_merge($updated_data['metadata'], $event->getMetaData());

        $projection = $projection_type->createEntity($updated_data);
        $this->handleEmbeddedEntityEvents($projection, $event->getEmbeddedEntityEvents());

        return new ProjectionMap([ $projection ]);
    }

    protected function onWorkflowProceeded(WorkflowProceededEvent $event)
    {
        $projection_type = $this->getProjectionType($event);
        $projection = $this->loadProjection($projection_type, $event->getAggregateRootIdentifier());

        $updated_data = $projection->toArray();
        $updated_data['revision'] = $event->getSeqNumber();
        $updated_data['modified_at'] = $event->getDateTime();
        $updated_data['metadata'] = array_merge($updated_data['metadata'], $event->getMetaData());
        $updated_data['workflow_state'] = $event->getWorkflowState();
        $workflow_parameters = $event->getWorkflowParameters();
        if ($workflow_parameters !== null) {
            $updated_data['workflow_parameters'] = $workflow_parameters;
        }

        $projection = $projection_type->createEntity($updated_data);

        return new ProjectionMap([ $projection ]);
    }

    protected function onAggregateRootNodeMoved(AggregateRootNodeMovedEvent $event)
    {
        $projection_type = $this->getProjectionType($event);
        $projection = $this->loadProjection($projection_type, $event->getAggregateRootIdentifier());

        $parent_projection = null;
        if ($parent_identifier = $event->getParentNodeId()) {
            $parent_projection = $this->loadProjection($projection_type, $parent_identifier);
        }

        $updated_data = $projection->toArray();
        $updated_data['revision'] = $event->getSeqNumber();
        $updated_data['modified_at'] = $event->getDateTime();
        $updated_data['metadata'] = array_merge($updated_data['metadata'], $event->getMetaData());
        $updated_data['parent_node_id'] = $parent_identifier;
        $updated_data['materialized_path'] = $this->calculateMaterializedPath($parent_projection);

        $updated_projections = [ $projection_type->createEntity($updated_data) ];
        $updated_projections = array_merge(
            $updated_projections,
            $this->updateChildNodesAfterMovingParent($updated_projections[0])
        );

        return new ProjectionMap($updated_projections);
    }

    protected function updateChildNodesAfterMovingParent(EntityInterface $parent)
    {
        // find all existing children of the moved parent node
        $projection_type = $parent->getType();
        $parent_identifier = $parent->getIdentifier();
        $path = $this->calculateMaterializedPath($parent);

        $affected_relatives = [];
        $this->getQueryService($projection_type)->scroll(
            new CriteriaQuery(
                new CriteriaList,
                new CriteriaList(
                    [ new AttributeCriteria('materialized_path', new Equals($parent_identifier)) ]
                ),
                new CriteriaList,
                0,
                $this->config->get('batch_size', 1000)
            ),
            function (
                ProjectionInterface $projection
            ) use (
                &$affected_relatives,
                $parent_identifier,
                $projection_type,
                $path
            ) {
                // @note if there are many affected relatives this could consume memory
                $child_data = $projection->toArray();
                $pattern = '#.*' . $parent_identifier . '#';
                $child_data['materialized_path'] = preg_replace($pattern, $path, $projection->getMaterializedPath());
                $affected_relatives[] = $projection_type->createEntity($child_data);
            }
        );

        return $affected_relatives;
    }

    protected function calculateMaterializedPath(ProjectionInterface $parent = null)
    {
        $path_parts = [];
        if ($parent) {
            $parent_path = $parent->getMaterializedPath();
            if (!empty($parent_path)) {
                $path_parts = explode('/', $parent_path);
            }
            $path_parts[] = $parent->getIdentifier();
        }

        return implode('/', $path_parts);
    }

    protected function handleEmbeddedEntityEvents(EntityInterface $projection, EmbeddedEntityEventList $events)
    {
        foreach ($events as $event) {
            if ($event instanceof EmbeddedEntityAddedEvent) {
                $this->onEmbeddedEntityAdded($projection, $event);
            } elseif ($event instanceof EmbeddedEntityModifiedEvent) {
                $this->onEmbeddedEntityModified($projection, $event);
            } elseif ($event instanceof EmbeddedEntityRemovedEvent) {
                $this->onEmbeddedEntityRemoved($projection, $event);
            } else {
                throw new RuntimeError(
                    sprintf(
                        'Unsupported domain event-type given. Supported default event-types are: %s.',
                        implode(
                            ', ',
                            [
                                EmbeddedEntityAddedEvent::CLASS,
                                EmbeddedEntityModifiedEvent::CLASS,
                                EmbeddedEntityRemovedEvent::CLASS
                            ]
                        )
                    )
                );
            }
        }
    }

    protected function onEmbeddedEntityAdded(EntityInterface $projection, EmbeddedEntityAddedEvent $event)
    {
        $embedded_projection_attr = $projection->getType()->getAttribute($event->getParentAttributeName());
        $embedded_projection_type = $this->getEmbeddedEntityType($projection, $event);
        $embedded_projection = $embedded_projection_type->createEntity($event->getData(), $projection);
        $projection_list = $projection->getValue($embedded_projection_attr->getName());
        if ($embedded_projection_type instanceof ReferencedEntityTypeInterface) {
            $embedded_projection = $this->mirrorReferencedProjection($embedded_projection);
        }

        $projection_list->insertAt($event->getPosition(), $embedded_projection);

        $this->handleEmbeddedEntityEvents($embedded_projection, $event->getEmbeddedEntityEvents());
    }

    protected function onEmbeddedEntityModified(EntityInterface $projection, EmbeddedEntityModifiedEvent $event)
    {
        $embedded_projection_attr = $projection->getType()->getAttribute($event->getParentAttributeName());
        $embedded_projection_type = $this->getEmbeddedEntityType($projection, $event);

        $embedded_projections = $projection->getValue($embedded_projection_attr->getName());
        $projection_to_modify = null;
        foreach ($embedded_projections as $embedded_projection) {
            if ($embedded_projection->getIdentifier() === $event->getEmbeddedEntityIdentifier()) {
                $projection_to_modify = $embedded_projection;
            }
        }

        if ($projection_to_modify) {
            $embedded_projections->removeItem($projection_to_modify);
            $projection_to_modify = $embedded_projection_type->createEntity(
                array_merge($projection_to_modify->toArray(), $event->getData()),
                $projection
            );

            if ($embedded_projection_type instanceof ReferencedEntityTypeInterface) {
                $projection_to_modify = $this->mirrorReferencedProjection($projection_to_modify);
            }

            $embedded_projections->insertAt($event->getPosition(), $projection_to_modify);
            $this->handleEmbeddedEntityEvents($projection_to_modify, $event->getEmbeddedEntityEvents());
        }
    }

    protected function onEmbeddedEntityRemoved(EntityInterface $projection, EmbeddedEntityRemovedEvent $event)
    {
        $projection_list = $projection->getValue($event->getParentAttributeName());
        $projection_to_remove = null;

        foreach ($projection_list as $embedded_projection) {
            if ($embedded_projection->getIdentifier() === $event->getEmbeddedEntityIdentifier()) {
                $projection_to_remove = $embedded_projection;
            }
        }

        if ($projection_to_remove) {
            $projection_list->removeItem($projection_to_remove);
        }
    }

    /**
     * Evaluate and updated mirrored values from a loaded referenced projection
     */
    protected function mirrorReferencedProjection(EntityReferenceInterface $projection)
    {
        $projection_type = $projection->getType();
        $mirrored_attribute_map = $projection_type->getAttributes()->filter(
            function (AttributeInterface $attribute) {
                return (bool)$attribute->getOption('mirrored', false) === true;
            }
        );

        // Don't need to load a referenced entity if the mirrored attribute map is empty
        if ($mirrored_attribute_map->isEmpty()) {
            return $projection;
        }

        // Load the referenced projection to mirror values from
        $referenced_type = $this->projection_type_map->getByClassName(
            $projection_type->getReferencedTypeClass()
        );
        $referenced_identifier = $projection->getReferencedIdentifier();
        if (empty($referenced_identifier)) {
            $this->logger->error('[Zombie Alarm] RefId EMPTY: '.json_encode($projection->getRoot()->toArray()));
            return $projection;
        }

        if ($referenced_identifier === $projection->getRoot()->getIdentifier()) {
            $referenced_projection = $projection->getRoot(); // self reference, no need to load
        } else {
            $referenced_projection = $this->loadReferencedProjection($referenced_type, $referenced_identifier);
            if (!$referenced_projection) {
                $this->logger->debug('[Zombie Alarm] Unable to load referenced projection: ' . $referenced_identifier);
                return $projection;
            }
        }

        // Add default attribute values
        $mirrored_values['@type'] = $projection_type->getPrefix();
        $mirrored_values['identifier'] = $projection->getIdentifier();
        $mirrored_values['referenced_identifier'] = $projection->getReferencedIdentifier();
        $mirrored_values = array_merge(
            $projection->createMirrorFrom($referenced_projection)->toArray(),
            $mirrored_values
        );

        return $projection_type->createEntity($mirrored_values, $projection->getParent());
    }

    protected function loadProjection(ProjectionTypeInterface $projection_type, $identifier)
    {
        return $this->getStorageReader($projection_type)->read($identifier);
    }

    protected function loadReferencedProjection(EntityTypeInterface $referenced_type, $identifier)
    {
        $search_result = $this->getFinder($referenced_type)->getByIdentifier($identifier);
        if (!$search_result->hasResults()) {
            return null;
        }
        return $search_result->getFirstResult();
    }

    protected function getEmbeddedEntityType(EntityInterface $projection, EmbeddedEntityEventInterface $event)
    {
        $embed_attribute = $projection->getType()->getAttribute($event->getParentAttributeName());
        return $embed_attribute->getEmbeddedTypeByPrefix($event->getEmbeddedEntityType());
    }

    protected function getProjectionType(AggregateRootEventInterface $event)
    {
        return $this->projection_type_map->getByAggregateRootType(
            $this->aggregate_root_type_map->getItem($event->getAggregateRootType())
        );
    }

    protected function getQueryService(ProjectionTypeInterface $projection_type)
    {
        return $this->getDataAccessComponent($projection_type, 'query_service');
    }

    protected function getStorageWriter(ProjectionTypeInterface $projection_type)
    {
        return $this->getDataAccessComponent($projection_type, 'writer');
    }

    protected function getStorageReader(ProjectionTypeInterface $projection_type)
    {
        return $this->getDataAccessComponent($projection_type, 'reader');
    }

    protected function getFinder(ProjectionTypeInterface $projection_type)
    {
        return $this->getDataAccessComponent($projection_type, 'finder');
    }

    protected function getDataAccessComponent(ProjectionTypeInterface $projection_type, $component)
    {
        $default_component_name = sprintf('%s::view_store::%s', $projection_type->getVariantPrefix(), $component);
        $custom_component_option = $projection_type->getPrefix() . '.' . $component;

        switch ($component) {
            case 'finder':
                return $this->data_access_service->getFinder(
                    $this->config->get($custom_component_option, $default_component_name)
                );
                break;
            case 'reader':
                return $this->data_access_service->getStorageReader(
                    $this->config->get($custom_component_option, $default_component_name)
                );
                break;
            case 'writer':
                return $this->data_access_service->getStorageWriter(
                    $this->config->get($custom_component_option, $default_component_name)
                );
                break;
            case 'query_service':
                return $this->data_access_service->getQueryService(
                    $this->config->get($custom_component_option, $default_component_name)
                );
                break;
        }

        throw new RuntimeError('Invalid data access component name given: ' . $component);
    }
}