onebeyond/onebeyond-studio-core

View on GitHub
src/OneBeyond.Studio.Infrastructure.Azure/MessageQueues/ServiceBusPubSubMessageQueueBase.cs

Summary

Maintainability
A
2 hrs
Test Coverage
using System;
using System.Threading;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using EnsureThat;
using Newtonsoft.Json.Linq;
using Nito.AsyncEx;
using OneBeyond.Studio.Crosscuts.MessageQueues;
using OneBeyond.Studio.Infrastructure.Azure.MessageQueues.Options;

namespace OneBeyond.Studio.Infrastructure.Azure.MessageQueues;

internal abstract class ServiceBusPubSubMessageQueueBase<TMessage>
    : IMessageQueue<TMessage>
    , IAsyncDisposable
{

    protected ServiceBusPubSubMessageQueueBase(AzureServicePubSubBusMessageQueueOptions options)
    {
        EnsureArg.IsNotNull(options, nameof(options));

        options.EnsureIsValid();

        _ensureTopicExists = new AsyncLazy<bool>(
            async () =>
            {
                var serviceBusAdminClient = string.IsNullOrWhiteSpace(options.ResourceName)
                    ? new ServiceBusAdministrationClient(options.ConnectionString)
                    : new ServiceBusAdministrationClient($"{options.ResourceName}.servicebus.windows.net", new DefaultAzureCredential());

                var queueExists = await serviceBusAdminClient.TopicExistsAsync(
                        options.TopicName)
                    .ConfigureAwait(false);

                if (!queueExists)
                {
                    await serviceBusAdminClient.CreateTopicAsync(
                          new CreateTopicOptions(options.TopicName))
                      .ConfigureAwait(false);
                }

                return true;
            },
            AsyncLazyFlags.RetryOnFailure);

        _serviceBusClient = string.IsNullOrWhiteSpace(options.ResourceName)
                  ? new ServiceBusClient(options.ConnectionString)
                  : new ServiceBusClient($"{options.ResourceName}.servicebus.windows.net", new DefaultAzureCredential());

        _serviceBusSender = _serviceBusClient.CreateSender(options.TopicName);
    }

    private readonly AsyncLazy<bool> _ensureTopicExists;
    private readonly ServiceBusClient _serviceBusClient;
    private readonly ServiceBusSender _serviceBusSender;

    public ValueTask DisposeAsync()
        => _serviceBusClient.DisposeAsync();

    public async Task PublishAsync(TMessage message, CancellationToken cancellationToken = default)
    {
        EnsureArg.HasValue(message, nameof(message));

        await _ensureTopicExists.Task.ConfigureAwait(false);
        var messageJson = JObject.FromObject(message);
        await _serviceBusSender.SendMessageAsync(new ServiceBusMessage(messageJson.ToString()),
                cancellationToken)
            .ConfigureAwait(false);
    }
}