dolittle/DotNET.SDK

View on GitHub
Source/Aggregates/Actors/AggregateActor.cs

Summary

Maintainability
A
0 mins
Test Coverage
F
0%
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Diagnostics;
using Dolittle.SDK.Aggregates.Internal;
using Dolittle.SDK.Async;
using Dolittle.SDK.Events;
using Dolittle.SDK.Tenancy;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Cluster;

namespace Dolittle.SDK.Aggregates.Actors;


delegate TimeSpan AggregateUnloadTimeout();

class Perform<TAggregate>(Func<TAggregate, Task> callback, CancellationToken cancellationToken) where TAggregate : AggregateRoot
{
    public Func<TAggregate, Task> Callback { get; } = callback;
    public CancellationToken CancellationToken { get; } = cancellationToken;
}

class PerformAndRespond<TAggregate>(Func<TAggregate, object?> callback, CancellationToken cancellationToken) where TAggregate : AggregateRoot
{
    public Func<TAggregate, object?> Callback { get; } = callback;
    public CancellationToken CancellationToken { get; } = cancellationToken;
}

class AggregateActor<TAggregate> : IActor where TAggregate : AggregateRoot
{
    readonly GetServiceProviderForTenant _getServiceProvider;
    readonly ILogger<AggregateActor<TAggregate>> _logger;
    AggregateWrapper<TAggregate>? _aggregateWrapper;

    EventSourceId? _eventSourceId;

    readonly TimeSpan _idleUnloadTimeout;

    internal AggregateActor(GetServiceProviderForTenant getServiceProvider, ILogger<AggregateActor<TAggregate>> logger, TimeSpan idleUnloadTimeout)
    {
        _getServiceProvider = getServiceProvider;
        _logger = logger;
        _idleUnloadTimeout = idleUnloadTimeout;
    }

    public Task ReceiveAsync(IContext context)
    {
        return context.Message switch
        {
            Started => OnStarted(context),
            Stopping => OnStopping(context),
            ReceiveTimeout => OnReceiveTimeout(context),
            Perform<TAggregate> msg => OnPerform(msg, context),
            PerformAndRespond<TAggregate> msg => OnPerformAndRespond(msg, context),
            _ => Task.CompletedTask
        };
    }

    Task OnStopping(IContext _)
    {
        _logger.UnloadingAggregate(typeof(TAggregate));
        return Task.CompletedTask;
    }

    static Task OnReceiveTimeout(IContext context)
    {
        context.Poison(context.Self);
        return Task.CompletedTask;
    }

    async Task OnStarted(IContext context)
    {
        try
        {
            var (tenantId, eventSourceId) = GetIdentifiers(context);

            _eventSourceId = eventSourceId;
            var serviceProvider = await _getServiceProvider(tenantId);
            _aggregateWrapper = ActivatorUtilities.CreateInstance<AggregateWrapper<TAggregate>>(serviceProvider, _eventSourceId);
            if (_idleUnloadTimeout > TimeSpan.Zero)
            {
                context.SetReceiveTimeout(_idleUnloadTimeout);
            }
        }
        catch (Exception e)
        {
            _logger.FailedToCreate(e, typeof(TAggregate));
            Activity.Current?.RecordError(e);
            throw;
        }
    }

    static (TenantId, EventSourceId) GetIdentifiers(IContext context)
    {
        return ClusterIdentityMapper.GetTenantAndEventSourceId(context.ClusterIdentity()!);
    }

    async Task OnPerform(Perform<TAggregate> perform, IContext context)
    {
        try
        {
            await _aggregateWrapper!.Perform(perform.Callback, perform.CancellationToken);
            context.Respond(new Try<bool>(true));
        }
        catch (Exception e)
        {
            Activity.Current?.RecordError(e);
            context.Respond(new Try<bool>(e));
        }
        finally
        {
            if (_idleUnloadTimeout == TimeSpan.Zero) // 0 means instantly unload
            {
                // ReSharper disable once MethodHasAsyncOverload - awaiting this will deadlock
                context.Poison(context.Self);
            }
        }
    }
    
    async Task OnPerformAndRespond(PerformAndRespond<TAggregate> performAndRespond, IContext context)
    {
        try
        {
            var response = await _aggregateWrapper!.Perform(performAndRespond.Callback, performAndRespond.CancellationToken);
            context.Respond(new Try<object?>(response));
        }
        catch (Exception e)
        {
            Activity.Current?.RecordError(e);
            context.Respond(new Try<object?>(e));
        }
        finally
        {
            if (_idleUnloadTimeout == TimeSpan.Zero) // 0 means instantly unload
            {
                // ReSharper disable once MethodHasAsyncOverload - awaiting this will deadlock
                context.Poison(context.Self);
            }
        }
    }
}