onebeyond/onebeyond-studio-core

View on GitHub
src/OneBeyond.Studio.Infrastructure.RabbitMQ/RabbitPubSubMessageQueueBase.cs

Summary

Maintainability
A
0 mins
Test Coverage
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Newtonsoft.Json;
using OneBeyond.Studio.Crosscuts.MessageQueues;
using OneBeyond.Studio.Infrastructure.RabbitMQ.Options;
using RabbitMQ.Client;

namespace OneBeyond.Studio.Infrastructure.RabbitMQ;

internal abstract class RabbitPubSubMessageQueueBase<TMessage>
    : IMessageQueue<TMessage>
    , IDisposable
{
    private readonly string _exchangeName;
    private readonly IConnection _connection;
    private IModel _channel;

    protected RabbitPubSubMessageQueueBase(RabbitPubSubMessageQueueOptions options)
    {
        EnsureArg.IsNotNull(options, nameof(options));
        EnsureArg.IsNotNull(options.Connection, nameof(options.Connection));
        EnsureArg.IsNotNullOrWhiteSpace(options.ExchangeName, nameof(options.ExchangeName));

        var connectionFactory = new ConnectionFactory
        {
            Uri = options.Connection
        };

        _exchangeName = options.ExchangeName!;
        _connection = connectionFactory.CreateConnection();
        _channel = _connection.CreateModel();

        _channel.ExchangeDeclare(_exchangeName, ExchangeType.Fanout, true, false);
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (disposing)
        {
            _channel.Close();
            _channel.Dispose();
            _connection.Close();
            _connection.Dispose();
        }
    }

    public Task PublishAsync(TMessage message, CancellationToken cancellationToken = default)
    {
        var messageJson = JsonConvert.SerializeObject(message);
        var messageBytes = Encoding.UTF8.GetBytes(messageJson);

        lock (_connection)
        {
            if (_channel.IsClosed)
            {
                _channel = _connection.CreateModel();
            }

            _channel.BasicPublish(
                _exchangeName,
                string.Empty,
                default,
                messageBytes);
        }
        return Task.CompletedTask;
    }
}