dolittle/DotNET.SDK

View on GitHub
Source/Aggregates/Internal/AggregateWrapper.cs

Summary

Maintainability
A
0 mins
Test Coverage
F
0%
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Diagnostics;
using Dolittle.SDK.Events;
using Dolittle.SDK.Events.Builders;
using Dolittle.SDK.Events.Store;
using Dolittle.SDK.Events.Store.Builders;
using Microsoft.Extensions.Logging;
#pragma warning disable CS0618 // Refers to EventSourceId which is marked obsolete for clients. Should still be used internally

namespace Dolittle.SDK.Aggregates.Internal;

/// <summary>
/// Stateful wrapper for an aggregate root instance.
/// </summary>
/// <typeparam name="TAggregate"></typeparam>
class AggregateWrapper<TAggregate> where TAggregate : AggregateRoot
{
    readonly EventSourceId _eventSourceId;
    readonly IEventStore _eventStore;
    readonly IEventTypes _eventTypes;
    readonly IServiceProvider _serviceProvider;
    readonly ILogger _logger;

    TAggregate? _instance;

    /// <summary>
    /// Initializes a new instance of the <see cref="AggregateWrapper{TAggregate}"/> class.
    /// </summary>
    /// <param name="eventSourceId">The <see cref="EventSourceId"/> of the aggregate root instance.</param>
    /// <param name="eventStore">The <see cref="IEventStore" /> used for committing the <see cref="UncommittedAggregateEvents" /> when actions are performed on the <typeparamref name="TAggregate">aggregate</typeparamref>. </param>
    /// <param name="eventTypes">The <see cref="IEventTypes"/>.</param>
    /// <param name="serviceProvider">The tenant scoped <see cref="IServiceProvider"/>.</param>
    /// <param name="logger">The <see cref="ILogger" />.</param>
    public AggregateWrapper(EventSourceId eventSourceId, IEventStore eventStore, IEventTypes eventTypes, IServiceProvider serviceProvider, ILogger<AggregateWrapper<TAggregate>> logger)
    {
        _eventSourceId = eventSourceId;
        _eventTypes = eventTypes;
        _eventStore = eventStore;
        _serviceProvider = serviceProvider;
        _logger = logger;
    }

    public async Task<TResponse> Perform<TResponse>(Func<TAggregate, TResponse> method, CancellationToken cancellationToken = default)
    {
        using var activity = Tracing.ActivitySource.StartActivity($"{typeof(TAggregate).Name}.Perform")
            ?.Tag(_eventSourceId);

        try
        {
            _instance = await GetHydratedAggregate(cancellationToken);
            var aggregateRootId = _instance.AggregateRootId;
            activity?.Tag(aggregateRootId);
            _logger.PerformingOn(typeof(TAggregate), aggregateRootId, _instance.EventSourceId);
            var result = method(_instance);
            if (_instance.AppliedEvents.Any())
            {
                await CommitAppliedEvents(_instance, aggregateRootId).ConfigureAwait(false);
                _instance.ClearAppliedEvents();
            }

            return result;
        }
        catch (Exception e)
        {
            _instance = null; // Reset the instance so that it will be rehydrated next time
            activity?.RecordError(e);
            throw new AggregateRootOperationFailed(typeof(TAggregate), _eventSourceId, e);
        }
    }
    
    public async Task Perform(Func<TAggregate, Task> method, CancellationToken cancellationToken = default)
    {
        using var activity = Tracing.ActivitySource.StartActivity($"{typeof(TAggregate).Name}.Perform")
            ?.Tag(_eventSourceId);

        try
        {
            _instance = await GetHydratedAggregate(cancellationToken);
            var aggregateRootId = _instance.AggregateRootId;
            activity?.Tag(aggregateRootId);
            _logger.PerformingOn(typeof(TAggregate), aggregateRootId, _instance.EventSourceId);
            await method(_instance);
            if (_instance.AppliedEvents.Any())
            {
                await CommitAppliedEvents(_instance, aggregateRootId).ConfigureAwait(false);
                _instance.ClearAppliedEvents();
            }
        }
        catch (Exception e)
        {
            _instance = null; // Reset the instance so that it will be rehydrated next time
            activity?.RecordError(e);
            throw new AggregateRootOperationFailed(typeof(TAggregate), _eventSourceId, e);
        }
    }

    async ValueTask<TAggregate> GetHydratedAggregate(CancellationToken cancellationToken)
    {
        if (_instance is { } preHydratedAggregate) return preHydratedAggregate;

        if (!TryGetAggregateRoot(out var aggregateRoot, out var exception))
        {
            throw new CouldNotGetAggregateRoot(typeof(TAggregate), _eventSourceId, exception);
        }

        await Rehydrate(aggregateRoot, aggregateRoot.GetAggregateRootId(), cancellationToken);
        return aggregateRoot;
    }

    bool TryGetAggregateRoot(out TAggregate aggregateRoot, out Exception exception)
    {
        var getAggregateRoot = AggregateRootMetadata<TAggregate>.Construct(_serviceProvider, _eventSourceId);
        aggregateRoot = getAggregateRoot.Result;
        exception = getAggregateRoot.Exception;
        return getAggregateRoot.Success;
    }

    Task Rehydrate(TAggregate aggregateRoot, AggregateRootId aggregateRootId, CancellationToken cancellationToken)
    {
        var eventSourceId = aggregateRoot.EventSourceId;
        _logger.RehydratingAggregateRoot(typeof(TAggregate), aggregateRootId, eventSourceId);
        var eventTypesToFetch = GetEventTypes(_eventTypes);
        var committedEventsBatches = _eventStore.FetchStreamForAggregate(aggregateRootId, eventSourceId, eventTypesToFetch, cancellationToken);
        return aggregateRoot.RehydrateInternal(committedEventsBatches, AggregateRootMetadata<TAggregate>.MethodsPerEventType, cancellationToken);
    }

    static bool IsStateLess => AggregateRootMetadata<TAggregate>.IsStateLess;

    /// <summary>
    /// Gets all the <see cref="IEnumerable{T}"/> of <see cref="EventType"/> that the aggregates handles
    /// </summary>
    /// <param name="eventTypes"></param>
    /// <returns></returns>
    static IEnumerable<EventType> GetEventTypes(IEventTypes eventTypes)
        => IsStateLess
            ? Enumerable.Empty<EventType>()
            :  AggregateRootMetadata<TAggregate>.MethodsPerEventType.Keys.Select(eventTypes.GetFor);

    Task<CommittedAggregateEvents> CommitAppliedEvents(TAggregate aggregateRoot, AggregateRootId aggregateRootId)
    {
        _logger.CommittingEvents(aggregateRoot.GetType(), aggregateRootId, aggregateRoot.AppliedEvents.Count(), aggregateRoot.EventSourceId);
        return _eventStore
            .ForAggregate(aggregateRootId)
            .WithEventSource(aggregateRoot.EventSourceId)
            .ExpectVersion(aggregateRoot.Version.Value - (ulong)aggregateRoot.AppliedEvents.Count())
            .Commit(builder => CreateUncommittedEvents(builder, aggregateRoot));
    }

    void CreateUncommittedEvents(UncommittedAggregateEventsBuilder builder, TAggregate aggregateRoot)
    {
        foreach (var appliedEvent in aggregateRoot.AppliedEvents)
        {
            var uncommittedEvent = ToUncommittedEvent(appliedEvent);
            var eventBuilder = uncommittedEvent.IsPublic
                ? builder.CreatePublicEvent(uncommittedEvent.Content)
                : builder.CreateEvent(uncommittedEvent.Content);
            eventBuilder.WithEventType(uncommittedEvent.EventType);
        }
    }

    UncommittedAggregateEvent ToUncommittedEvent(AppliedEvent appliedEvent)
    {
        var @event = appliedEvent.Event;
        var eventType = appliedEvent.EventType;
        if (appliedEvent.HasEventType)
        {
            ThrowIfWrongEventType(@event, eventType!);
        }
        else
        {
            eventType = _eventTypes.GetFor(@event.GetType());
        }

        return new UncommittedAggregateEvent(eventType!, @event, appliedEvent.Public);
    }

    void ThrowIfWrongEventType(object @event, EventType eventType)
    {
        var typeOfEvent = @event.GetType();
        if (!_eventTypes.HasFor(typeOfEvent))
        {
            return;
        }

        var associatedEventType = _eventTypes.GetFor(typeOfEvent);
        if (eventType != associatedEventType)
        {
            throw new ProvidedEventTypeDoesNotMatchEventTypeFromAttribute(eventType, associatedEventType, typeOfEvent);
        }
    }
}