onebeyond/onebeyond-studio-core

View on GitHub
src/OneBeyond.Studio.Infrastructure.Azure/MessageQueues/StorageLargeMessageQueueBase.cs

Summary

Maintainability
A
0 mins
Test Coverage
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Storage.Queues;
using EnsureThat;
using Newtonsoft.Json;
using Nito.AsyncEx;
using OneBeyond.Studio.Crosscuts.MessageQueues;
using OneBeyond.Studio.Infrastructure.Azure.MessageQueues.Options;
using OneBeyond.Studio.Infrastructure.Azure.Storage;

namespace OneBeyond.Studio.Infrastructure.Azure.MessageQueues;

internal abstract class StorageLargeMessageQueueBase<TMessage> : IMessageQueue<TMessage>
{
    private readonly AsyncLazy<BlobContainerClient> _messageBlobContainer;
    private readonly AsyncLazy<QueueClient> _messageQueue;

    protected StorageLargeMessageQueueBase(AzureLargeMessageQueueOptions options)
    {
        EnsureArg.IsNotNull(options, nameof(options));

        options.EnsureIsValid();

        _messageBlobContainer = new AsyncLazy<BlobContainerClient>(
            () => BlobContainer.GetOrCreateAsync(options.ResourceName, options.ConnectionString, options.ContainerName!),
            AsyncLazyFlags.RetryOnFailure);

        _messageQueue = new AsyncLazy<QueueClient>(
            () => Queue.GetOrCreateAsync(options.ResourceName, options.ConnectionString, options.QueueName!),
            AsyncLazyFlags.RetryOnFailure);
    }

    public async Task PublishAsync(TMessage message, CancellationToken cancellationToken = default)
    {
        var messageJson = JsonConvert.SerializeObject(message);

        var messageJsonBytes = Encoding.UTF8.GetBytes(messageJson);

        var messageBlobContainer = await _messageBlobContainer.Task.ConfigureAwait(false);

        var messageBlobName = Guid.NewGuid().ToString();

        //Line below required due to https://stackoverflow.com/questions/65041620/model-binding-issue-in-azure-function-after-switching-to-azure-storage-queues
        var messageBlobNameAsBase64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(messageBlobName));

        await messageBlobContainer.UploadBlobAsync(
                messageBlobName,
                new MemoryStream(messageJsonBytes),
                cancellationToken)
            .ConfigureAwait(false);

        var messageQueue = await _messageQueue.Task.ConfigureAwait(false);

        await messageQueue.SendMessageAsync(
                messageBlobNameAsBase64,
                cancellationToken: cancellationToken)
            .ConfigureAwait(false);
    }
}