Source/SDK/DolittleClient.cs
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
#if NET5_0_OR_GREATER
using System.Net.Http;
#endif
using System.Threading;
using System.Threading.Tasks;
using Dolittle.SDK.Aggregates;
using Dolittle.SDK.Aggregates.Builders;
using Dolittle.SDK.Aggregates.Internal;
using Dolittle.SDK.Builders;
using Dolittle.SDK.Common.ClientSetup;
using Dolittle.SDK.DependencyInversion;
using Dolittle.SDK.EventHorizon;
using Dolittle.SDK.Events;
using Dolittle.SDK.Events.Builders;
using Dolittle.SDK.Events.Filters.Builders;
using Dolittle.SDK.Events.Handling.Builder;
using Dolittle.SDK.Events.Processing;
using Dolittle.SDK.Events.Store.Builders;
using Dolittle.SDK.Events.Store.Converters;
using Dolittle.SDK.Handshake;
using Dolittle.SDK.Handshake.Internal;
using Dolittle.SDK.Projections.Builder;
using Dolittle.SDK.Projections.Store.Builders;
using Dolittle.SDK.Resources;
using Dolittle.SDK.Resources.Internal;
using Dolittle.SDK.Services;
using Dolittle.SDK.Tenancy;
using Dolittle.SDK.Tenancy.Client.Internal;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ExecutionContext = Dolittle.SDK.Execution.ExecutionContext;
namespace Dolittle.SDK;
/// <summary>
/// Represents the client for working with the Dolittle Runtime.
/// </summary>
public class DolittleClient : IDisposable, IDolittleClient
{
readonly ICoordinateProcessing _processingCoordinator = new ProcessingCoordinator();
readonly IResolveCallContext _callContextResolver = new CallContextResolver();
readonly IClientBuildResults _buildResults;
readonly IUnregisteredEventTypes _unregisteredEventTypes;
readonly IUnregisteredAggregateRoots _unregisteredAggregateRoots;
readonly IUnregisteredEventFilters _unregisteredEventFilters;
readonly IUnregisteredEventHandlers _unregisteredEventHandlers;
readonly IUnregisteredProjections _unregisteredProjections;
readonly SubscriptionsBuilder _eventHorizonsBuilder;
readonly EventSubscriptionRetryPolicy _eventHorizonRetryPolicy;
readonly SemaphoreSlim _connectLock = new(1, 1);
readonly TaskCompletionSource<bool> _connectedCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
IConvertEventsToProtobuf _eventsToProtobufConverter;
EventHorizons _eventHorizons;
IProjectionStoreBuilder _projectionStoreBuilder;
ITenantScopedProviders _services;
bool _disposed;
IEnumerable<Tenant> _tenants;
IResourcesBuilder _resources;
IEventStoreBuilder _eventStore;
IAggregatesBuilder _aggregates;
CancellationTokenSource _clientCancellationTokenSource;
EventToSDKConverter _eventToSDKConverter;
GrpcChannel _grpcChannel;
/// <summary>
/// Initializes a new instance of the <see cref="DolittleClient"/> class.
/// </summary>
/// <param name="buildResults">The <see cref="IClientBuildResults"/>.</param>
/// <param name="unregisteredEventTypes">The <see cref="IUnregisteredEventTypes"/>.</param>
/// <param name="unregisteredAggregateRoots">The <see cref="IUnregisteredAggregateRoots"/>.</param>
/// <param name="unregisteredEventFilters">The <see cref="IUnregisteredEventFilters"/>.</param>
/// <param name="unregisteredEventHandlers">The <see cref="EventHandlerBuilder"/>.</param>
/// <param name="unregisteredProjections">The <see cref="IUnregisteredProjections"/>.</param>
/// <param name="eventHorizonsBuilder">The <see cref="SubscriptionsBuilder"/>.</param>
/// <param name="eventHorizonRetryPolicy">The <see cref="EventSubscriptionRetryPolicy"/>.</param>
public DolittleClient(
IClientBuildResults buildResults,
IUnregisteredEventTypes unregisteredEventTypes,
IUnregisteredAggregateRoots unregisteredAggregateRoots,
IUnregisteredEventFilters unregisteredEventFilters,
IUnregisteredEventHandlers unregisteredEventHandlers,
IUnregisteredProjections unregisteredProjections,
SubscriptionsBuilder eventHorizonsBuilder,
EventSubscriptionRetryPolicy eventHorizonRetryPolicy)
{
_buildResults = buildResults;
_unregisteredEventTypes = unregisteredEventTypes;
EventTypes = _unregisteredEventTypes;
_unregisteredAggregateRoots = unregisteredAggregateRoots;
_unregisteredEventFilters = unregisteredEventFilters;
_unregisteredEventHandlers = unregisteredEventHandlers;
_unregisteredProjections = unregisteredProjections;
_eventHorizonsBuilder = eventHorizonsBuilder;
_eventHorizonRetryPolicy = eventHorizonRetryPolicy;
}
/// <inheritdoc />
public bool IsConnected { get; private set; }
/// <inheritdoc />
public Task Connected => _connectedCompletionSource.Task;
internal IAggregateRootTypes AggregateRootTypes => _unregisteredAggregateRoots;
internal IUnregisteredProjections ProjectionTypes => _unregisteredProjections;
/// <inheritdoc />
public IEventTypes EventTypes { get; }
/// <inheritdoc />
public IEventStoreBuilder EventStore
{
get => GetOrThrowIfNotConnected(_eventStore);
private set => _eventStore = value;
}
/// <inheritdoc />
public IAggregatesBuilder Aggregates
{
get => GetOrThrowIfNotConnected(_aggregates);
private set => _aggregates = value;
}
/// <inheritdoc />
public IEventHorizons EventHorizons
{
get => GetOrThrowIfNotConnected(_eventHorizons);
private set => _eventHorizons = value as EventHorizons;
}
/// <inheritdoc />
public IEnumerable<Tenant> Tenants
{
get => GetOrThrowIfNotConnected(_tenants);
private set => _tenants = value;
}
/// <inheritdoc />
public IResourcesBuilder Resources
{
get => GetOrThrowIfNotConnected(_resources);
private set => _resources = value;
}
/// <inheritdoc />
public IProjectionStoreBuilder Projections
{
get => GetOrThrowIfNotConnected(_projectionStoreBuilder);
private set => _projectionStoreBuilder = value;
}
/// <inheritdoc />
public ITenantScopedProviders Services
{
get => GetOrThrowIfNotConnected(_services);
private set => _services = value;
}
/// <summary>
/// Create a client builder for a Microservice.
/// </summary>
/// <param name="setup">The optional <see cref="SetupDolittleClient"/> callback.</param>
/// <returns>The built <see cref="IDolittleClient"/>.</returns>
public static IDolittleClient Setup(SetupDolittleClient? setup = default)
{
var builder = new SetupBuilder();
setup?.Invoke(builder);
return builder.Build();
}
/// <inheritdoc />
public Task<IDolittleClient> Connect(ConfigureDolittleClient configureClient, CancellationToken cancellationToken = default)
{
var configuration = new DolittleClientConfiguration();
configureClient?.Invoke(configuration);
return Connect(configuration, cancellationToken);
}
/// <inheritdoc />
public Task<IDolittleClient> Connect(CancellationToken cancellationToken = default)
=> Connect(new DolittleClientConfiguration(), cancellationToken);
/// <inheritdoc />
public async Task<IDolittleClient> Connect(DolittleClientConfiguration configuration, CancellationToken cancellationToken = default)
{
if (IsConnected)
{
throw new CannotConnectDolittleClientMultipleTimes();
}
await _connectLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (IsConnected)
{
throw new CannotConnectDolittleClientMultipleTimes();
}
AddDefaultsFromServiceProviderInConfiguration(configuration);
var loggerFactory = configuration.LoggerFactory;
if (loggerFactory is not null)
{
_buildResults.WriteTo(loggerFactory.CreateLogger<DolittleClient>());
}
_grpcChannel = GrpcChannel.ForAddress(
$"http://{configuration.RuntimeHost}:{configuration.RuntimePort}",
new GrpcChannelOptions
{
Credentials = ChannelCredentials.Insecure,
MaxReceiveMessageSize = 32 * 1024 * 1024,
MaxSendMessageSize = 32 * 1024 * 1024,
#if NET5_0_OR_GREATER
HttpHandler = new SocketsHttpHandler
{
EnableMultipleHttp2Connections = true
}
#endif
});
var methodCaller = new MethodCaller(_grpcChannel, configuration.RuntimeHost, configuration.RuntimePort);
var (executionContext, tenants, otlpEndpoint) =
await ConnectToRuntime(methodCaller, configuration, loggerFactory, cancellationToken).ConfigureAwait(false);
Tenants = tenants;
await CreateDependencies(methodCaller, configuration, loggerFactory, executionContext, tenants).ConfigureAwait(false);
ConfigureContainer(configuration);
await RegisterAllUnregistered(methodCaller, configuration.PingInterval, executionContext, loggerFactory).ConfigureAwait(false);
IsConnected = true;
_connectedCompletionSource.SetResult(true);
return this;
}
finally
{
_connectLock.Release();
}
}
Task<ConnectionResult> ConnectToRuntime(IPerformMethodCalls methodCaller, DolittleClientConfiguration configuration, ILoggerFactory loggerFactory,
CancellationToken cancellationToken)
{
var runtimeConnector = new DolittleRuntimeConnector(
configuration.RuntimeHost,
configuration.RuntimePort,
configuration.Version,
new HandshakeClient(methodCaller, loggerFactory.CreateLogger<HandshakeClient>()),
new TenantsClient(methodCaller, loggerFactory.CreateLogger<TenantsClient>()),
_buildResults,
loggerFactory.CreateLogger<DolittleRuntimeConnector>());
return runtimeConnector.ConnectForever(cancellationToken);
}
/// <inheritdoc />
public Task Disconnect(CancellationToken cancellationToken = default)
{
_clientCancellationTokenSource.Cancel();
return Task.WhenAny(_processingCoordinator.Completion, Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken));
}
/// <inheritdoc/>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Dispose resources.
/// </summary>
/// <param name="disposeManagedResources">Whether to dispose managed resources.</param>
protected virtual void Dispose(bool disposeManagedResources)
{
if (_disposed)
{
return;
}
if (disposeManagedResources)
{
_services?.Dispose();
_clientCancellationTokenSource?.Dispose();
_eventHorizons?.Dispose();
_grpcChannel?.Dispose();
}
_disposed = true;
}
async Task CreateDependencies(
IPerformMethodCalls methodCaller,
DolittleClientConfiguration config,
ILoggerFactory loggerFactory,
ExecutionContext executionContext,
IEnumerable<Tenant> tenants)
{
_clientCancellationTokenSource = new CancellationTokenSource();
var serializer = new EventContentSerializer(_unregisteredEventTypes, config.EventSerializerProvider);
_eventsToProtobufConverter = new EventToProtobufConverter(serializer);
_eventToSDKConverter = new EventToSDKConverter(serializer);
IServiceProvider TenantServiceProvider(TenantId tenant) => Services.ForTenant(tenant);
EventStore = new EventStoreBuilder(
methodCaller,
_eventsToProtobufConverter,
_eventToSDKConverter,
new AggregateEventToProtobufConverter(serializer),
new AggregateEventToSDKConverter(serializer),
executionContext,
_callContextResolver,
_unregisteredEventTypes,
loggerFactory);
// Important to not send in the method group because it will crash because it calls get on the Services-property before it is ready.
#pragma warning disable IDE0200
Aggregates = new AggregatesBuilder(TenantServiceProvider);
#pragma warning restore IDE0200
EventHorizons = new EventHorizons(
methodCaller,
executionContext,
_eventHorizonRetryPolicy,
loggerFactory.CreateLogger<EventHorizons>());
Projections = new ProjectionStoreBuilder(
TenantServiceProvider,
executionContext,
_unregisteredProjections.ReadModelTypes,
loggerFactory);
Resources = await new ResourcesFetcher(
methodCaller,
executionContext,
config.ConfigureMongoClientSettings,
loggerFactory
).FetchResourcesFor(tenants, _clientCancellationTokenSource.Token).ConfigureAwait(false);
}
async Task RegisterAllUnregistered(IPerformMethodCalls methodCaller, TimeSpan pingInterval, ExecutionContext executionContext, ILoggerFactory loggerFactory)
{
_eventHorizonsBuilder.BuildAndSubscribe(_eventHorizons, _clientCancellationTokenSource.Token);
await _unregisteredEventTypes.Register(
new Events.Internal.EventTypesClient(
methodCaller,
executionContext,
loggerFactory.CreateLogger<Events.Internal.EventTypesClient>()),
_clientCancellationTokenSource.Token).ConfigureAwait(false);
await _unregisteredAggregateRoots.Register(
new AggregateRootsClient(
methodCaller,
executionContext,
loggerFactory.CreateLogger<AggregateRootsClient>()),
_clientCancellationTokenSource.Token).ConfigureAwait(false);
StartEventProcessors(methodCaller, pingInterval, executionContext, loggerFactory);
}
void StartEventProcessors(IPerformMethodCalls methodCaller, TimeSpan pingInterval, ExecutionContext executionContext, ILoggerFactory loggerFactory)
{
var reverseCallClientsCreator = new ReverseCallClientCreator(
pingInterval,
methodCaller,
executionContext,
_services,
loggerFactory);
var eventProcessors = new EventProcessors(
reverseCallClientsCreator,
_processingCoordinator,
loggerFactory.CreateLogger<EventProcessors>());
var eventProcessingConverter = new EventProcessingConverter(_eventToSDKConverter);
_unregisteredEventHandlers.Register(
eventProcessors,
eventProcessingConverter,
loggerFactory,
_clientCancellationTokenSource.Token);
_unregisteredEventFilters.Register(
eventProcessors,
eventProcessingConverter,
loggerFactory,
_clientCancellationTokenSource.Token);
_unregisteredProjections.Register(
eventProcessors,
eventProcessingConverter,
loggerFactory,
_clientCancellationTokenSource.Token);
}
TRequiresStartService GetOrThrowIfNotConnected<TRequiresStartService>(TRequiresStartService service)
{
if (!IsConnected)
{
throw new CannotUseUnconnectedDolittleClient();
}
return service;
}
static void AddDefaultsFromServiceProviderInConfiguration(DolittleClientConfiguration config)
{
var serviceProvider = config.ServiceProvider;
if (serviceProvider is null)
{
return;
}
if (config.LoggerFactory is null)
{
var loggerFactory = serviceProvider.GetService<ILoggerFactory>();
config.WithLogging(loggerFactory ?? LoggerFactory.Create(_ =>
{
_.SetMinimumLevel(LogLevel.Information);
_.AddConsole();
}));
}
if (config.TenantServiceProviderFactory is null)
{
var providerFactory = serviceProvider.GetService<ICreateTenantContainers>();
config.WithTenantServiceProviderFactory(providerFactory is not null
? providerFactory.Create
: DefaultTenantServiceProviderFactory.Instance);
}
}
void ConfigureContainer(DolittleClientConfiguration config)
{
var builder = new TenantScopedProvidersBuilder(config.ServiceProvider, config.TenantServiceProviderFactory)
.AddTenantServices(AddBuilderServices)
.AddTenantServices((_, collection) => collection.AddScoped(services => services.GetRequiredService<IResources>().MongoDB.GetDatabase(config.ConfigureMongoDatabaseSettings)))
.AddTenantServices(_unregisteredEventHandlers.AddTenantScopedServices)
.AddTenantServices(_unregisteredAggregateRoots.AddTenantScopedServices)
.AddTenantServices(_unregisteredProjections.AddTenantScopedServices)
.AddTenantServices(config.ConfigureTenantServices);
Services = builder.Build(_tenants.Select(_ => _.Id).ToImmutableHashSet());
}
void AddBuilderServices(TenantId tenant, IServiceCollection services)
=> services
.AddScoped(_ => EventStore.ForTenant(tenant))
.AddScoped(_ => Aggregates.ForTenant(tenant))
.AddScoped(_ => Projections.ForTenant(tenant))
.AddScoped(_ => Resources.ForTenant(tenant));
}