onebeyond/onebeyond-studio-core

View on GitHub
src/OneBeyond.Studio.DataAccess.EFCore/IntegrationEvents/IntegrationEventsProcessor.cs

Summary

Maintainability
B
4 hrs
Test Coverage
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Microsoft.EntityFrameworkCore;
using OneBeyond.Studio.Crosscuts.DynamicProxy;
using OneBeyond.Studio.Crosscuts.Reflection;
using OneBeyond.Studio.Crosscuts.Utilities.LogicalCallContext;
using OneBeyond.Studio.Domain.SharedKernel.Entities;
using OneBeyond.Studio.Application.SharedKernel.IntegrationEvents;
using OneBeyond.Studio.Domain.SharedKernel.IntegrationEvents;

namespace OneBeyond.Studio.DataAccess.EFCore.IntegrationEvents;

using EntityIntegrationEventList = IReadOnlyCollection<(DomainEntity Entity, IReadOnlyCollection<IntegrationEvent> IntegrationEvents)>;

internal sealed class IntegrationEventsProcessor : InterceptorBase
{
    private readonly IIntegrationEventDispatcher _integrationEventDispatcher;

    public IntegrationEventsProcessor(
        IIntegrationEventDispatcher integrationEventDispatcher)
    {
        EnsureArg.IsNotNull(integrationEventDispatcher, nameof(integrationEventDispatcher));

        _integrationEventDispatcher = integrationEventDispatcher;
    }

    protected override T Execute<T>(ISyncExecution<T> execution)
    {
        using (var saveChangesCall = InterceptSaveChangesCall(execution))
        {
            var entityIntegrationEvents = default(EntityIntegrationEventList);
            if (saveChangesCall?.IsFirstOnStack == true)
            {
                entityIntegrationEvents = CollectIntegrationEvents(saveChangesCall.DbContext);
                DispatchIntegrationEventsOnPreSaveAsync(entityIntegrationEvents)
                    .GetAwaiter()
                    .GetResult();
            }

            return base.Execute(execution);
        }
    }

    protected override async Task<T> ExecuteAsync<T>(IAsyncExecution<T> execution)
    {
        using (var saveChangesCall = InterceptSaveChangesCall(execution))
        {
            var entityIntegrationEvents = default(EntityIntegrationEventList);
            if (saveChangesCall?.IsFirstOnStack == true)
            {
                entityIntegrationEvents = CollectIntegrationEvents(saveChangesCall.DbContext);
                await DispatchIntegrationEventsOnPreSaveAsync(entityIntegrationEvents).ConfigureAwait(false);
            }

            return await base.ExecuteAsync(execution).ConfigureAwait(false);
        }
    }

    private async Task DispatchIntegrationEventsOnPreSaveAsync(EntityIntegrationEventList entityIntegrationEvents)
    {
        foreach (var (_, integrationEvents) in entityIntegrationEvents)
        {
            foreach (var integrationEvent in integrationEvents)
            {
                await _integrationEventDispatcher.DispatchAsync(
                        integrationEvent,
                        CancellationToken.None)
                    .ConfigureAwait(false);
            }
        }
    }

    private static EntityIntegrationEventList CollectIntegrationEvents(DbContext dbContext)
    {
        IEnumerable<(DomainEntity entity, IReadOnlyCollection<IntegrationEvent> integrationEvents)> raisedIntegrationEvents =
            dbContext.ChangeTracker.Entries<DomainEntity>()
                .Select(
                    (entityEntry) =>
                    (
                        entityEntry.Entity,
                        entityEntry.Entity.ReleaseIntegrationEvents().ToList() as IReadOnlyCollection<IntegrationEvent>
                    ));
        raisedIntegrationEvents = raisedIntegrationEvents
            .Where(
                (entityDomainEvents) => entityDomainEvents.integrationEvents.Count > 0);
        return raisedIntegrationEvents
            .ToList();
    }

    private static SaveChangesCallIntegration? InterceptSaveChangesCall(IExecution execution)
    {
        return SaveChangesCallIntegration.MethodInfoList.Any(
                (methodInfo) =>
                    methodInfo.Equals(execution.Method.GetBaseDefinition())) // SaveChanges can be overriden by a derived class
            ? new SaveChangesCallIntegration((DbContext)execution.Target)
            : default;
    }

    private sealed class SaveChangesCallIntegration : IDisposable
    {
        public SaveChangesCallIntegration(DbContext dbContext)
        {
            EnsureArg.IsNotNull(dbContext, nameof(dbContext));

            DbContext = dbContext;
            if (LogicalCallContext.FindData<bool?>(SAVE_CHANGES_CALL_DATA) == true)
            {
                IsFirstOnStack = false;
            }
            else
            {
                LogicalCallContext.SetData(SAVE_CHANGES_CALL_DATA, true);
                IsFirstOnStack = true;
            }
        }

        private const string SAVE_CHANGES_CALL_DATA = nameof(SaveChangesCallIntegration);

        public static MethodInfo[] MethodInfoList { get; } = new[]
        {
                Reflector.MethodFrom(
                    (DbContext dbContext) => dbContext.SaveChanges()),
                Reflector.MethodFrom(
                    (DbContext dbContext) => dbContext.SaveChanges(default)),
                Reflector.MethodFrom(
                    (DbContext dbContext) => dbContext.SaveChangesAsync(default)),
                Reflector.MethodFrom(
                    (DbContext dbContext) => dbContext.SaveChangesAsync(default, default)),
            };

        public DbContext DbContext { get; }
        public bool IsFirstOnStack { get; }

        public void Dispose()
        {
            if (IsFirstOnStack)
            {
                LogicalCallContext.SetData(SAVE_CHANGES_CALL_DATA, null);
            }
        }
    }
}