onebeyond/onebeyond-studio-core

View on GitHub
src/OneBeyond.Studio.DataAccess.EFCore/DomainEvents/RelationalDbRaisedDomainEventReceiver.cs

Summary

Maintainability
A
1 hr
Test Coverage
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Autofac.Features.OwnedInstances;
using EnsureThat;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using OneBeyond.Studio.Application.SharedKernel.DomainEvents;
using OneBeyond.Studio.Crosscuts.Activities;
using OneBeyond.Studio.Crosscuts.Exceptions;
using OneBeyond.Studio.Crosscuts.Logging;

namespace OneBeyond.Studio.DataAccess.EFCore.DomainEvents;

internal abstract class RelationalDbRaisedDomainEventReceiver<TDbContext> : IRaisedDomainEventReceiver
    where TDbContext : DbContext
{
    private const string ProcessReceivedDomainEventActivityName = "OneBeyond.Studio.DataAccess.DomainEventProcessing";

    private static readonly ILogger Logger = LogManager.CreateLogger<RelationalDbRaisedDomainEventReceiver<TDbContext>>();

    private readonly TDbContext _dbContext;

    protected RelationalDbRaisedDomainEventReceiver(Owned<TDbContext> dbContext)
    {
        EnsureArg.IsNotNull(dbContext, nameof(dbContext));

        _dbContext = dbContext.Value;
    }

    protected abstract string SelectForUpdateSql { get; }
    protected abstract string DeleteByIdSql { get; }

    public async Task RunAsync(
        Func<RaisedDomainEvent, CancellationToken, Task> processAsync,
        CancellationToken cancellationToken)
    {
        EnsureArg.IsNotNull(processAsync, nameof(processAsync));

        var strategy = _dbContext.Database.CreateExecutionStrategy();

        while (true)
        {
            try
            {
                await strategy.ExecuteAsync(
                    async () =>
                    {
                        using (var transaction = await _dbContext.Database
                            .BeginTransactionAsync(cancellationToken)
                            .ConfigureAwait(false))
                        {
                            var raisedDomainEvent = await _dbContext.Set<RaisedDomainEvent>()
                                .FromSqlRaw(SelectForUpdateSql)
                                .AsNoTracking()
                                .FirstOrDefaultAsync(cancellationToken)
                                .ConfigureAwait(false);
                            if (raisedDomainEvent is not null)
                            {
                                using (var activity = new Activity(ProcessReceivedDomainEventActivityName)
                                    .DoStart(raisedDomainEvent.ActivityId, raisedDomainEvent.ActivityTraceState))
                                {
                                    await processAsync(
                                            raisedDomainEvent.Clone(activity),
                                            cancellationToken)
                                        .ConfigureAwait(false);
                                }
                                await _dbContext.Database
                                    .ExecuteSqlRawAsync(DeleteByIdSql, raisedDomainEvent.Id)
                                    .ConfigureAwait(false);
                                await transaction.CommitAsync(CancellationToken.None).ConfigureAwait(false);
                            }
                            else
                            {
                                await transaction.DisposeAsync().ConfigureAwait(false);
                                await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken).ConfigureAwait(false);
                            }
                        }
                    }).ConfigureAwait(false);
            }
            catch (OperationCanceledException operationCanceledException)
            when (!operationCanceledException.IsCritical())
            {
                Logger.LogInformation("Execution of domain event receiver has been cancelled");
                return;
            }
            catch (Exception exception)
            {
                Logger.LogError(
                    exception,
                    "Execution of domain event receiver has terminated due to error");
                throw;
            }
        }
    }
}