guibranco/CrispyWaffle

View on GitHub
Src/CrispyWaffle.RabbitMQ/Log/RabbitMQLogProvider.cs

Summary

Maintainability
C
7 hrs
Test Coverage
using System;
using System.Collections.Concurrent;
using System.Text;
using System.Threading;
using CrispyWaffle.Extensions;
using CrispyWaffle.Infrastructure;
using CrispyWaffle.Log;
using CrispyWaffle.Log.Providers;
using CrispyWaffle.RabbitMQ.Utils.Communications;
using CrispyWaffle.Serialization;
using RabbitMQ.Client;

namespace CrispyWaffle.RabbitMQ.Log
{
    /// <summary>
    /// Class RabbitMQLogProvider.
    /// Implements the <see cref="ILogProvider" />
    /// Implements the <see cref="IDisposable" />.
    /// </summary>
    /// <seealso cref="ILogProvider" />
    /// <seealso cref="IDisposable" />
    public class RabbitMQLogProvider : ILogProvider, IDisposable
    {
        /// <summary>
        /// The level.
        /// </summary>
        private LogLevel _level;

        /// <summary>
        /// The connector.
        /// </summary>
        private readonly RabbitMQConnector _connector;

        /// <summary>
        /// The channel.
        /// </summary>
        private readonly IModel _channel;

        /// <summary>
        /// The cancellation token.
        /// </summary>
        private readonly CancellationToken _cancellationToken;

        /// <summary>
        /// The queue.
        /// </summary>
        private readonly ConcurrentQueue<string> _queue;

        /// <summary>
        /// Initializes a new instance of the <see cref="RabbitMQLogProvider" /> class.
        /// </summary>
        /// <param name="connector">The connector.</param>
        /// <param name="cancellationToken">The cancellation token.</param>
        /// <exception cref="ArgumentNullException">connector.</exception>
        public RabbitMQLogProvider(RabbitMQConnector connector, CancellationToken cancellationToken)
        {
            _connector = connector ?? throw new ArgumentNullException(nameof(connector));
            _channel = _connector.ConnectionFactory.CreateConnection().CreateModel();
            _channel.ExchangeDeclare(_connector.DefaultExchangeName, ExchangeType.Fanout, true);
            _cancellationToken = cancellationToken;
            _queue = new ConcurrentQueue<string>();
            var thread = new Thread(Worker);
            thread.Start();
        }

        /// <summary>
        /// Finalizes an instance of the <see cref="RabbitMQLogProvider" /> class.
        /// </summary>
        ~RabbitMQLogProvider()
        {
            Dispose(false);
        }

        /// <summary>
        /// Workers this instance.
        /// </summary>
        private void Worker()
        {
            Thread.CurrentThread.Name = "Message queue RabbitMQ log provider worker";
            Thread.Sleep(1000);

            while (true)
            {
                while (_queue.Count > 0)
                {
                    if (!_queue.TryDequeue(out var message))
                    {
                        break;
                    }

                    PropagateMessageInternal(message);
                }

                if (_cancellationToken.IsCancellationRequested)
                {
                    break;
                }
            }
        }

        /// <summary>
        /// Propagates the message internal.
        /// </summary>
        /// <param name="message">The message.</param>
        private void PropagateMessageInternal(string message)
        {
            try
            {
                var body = Encoding.UTF8.GetBytes(message);
                _channel.BasicPublish(_connector.DefaultExchangeName, string.Empty, null, body);
            }
            catch (Exception e)
            {
                LogConsumer.Trace(e);
            }
        }

        /// <summary>
        /// Releases unmanaged and - optionally - managed resources.
        /// </summary>
        /// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
        private void Dispose(bool disposing)
        {
            if (!disposing)
            {
                return;
            }

            _channel.Close();
        }

        /// <summary>
        /// Serializes the specified level.
        /// </summary>
        /// <param name="level">The level.</param>
        /// <param name="category">The category.</param>
        /// <param name="message">The message.</param>
        /// <param name="identifier">The identifier.</param>
        /// <returns>System.String.</returns>
        private static string Serialize(
            LogLevel level,
            string category,
            string message,
            string identifier = null
        )
        {
            return (string)
                new LogMessage
                {
                    Application = EnvironmentHelper.ApplicationName,
                    Category = category,
                    Date = DateTime.Now,
                    Hostname = EnvironmentHelper.Host,
                    Id = Guid.NewGuid().ToString(),
                    IpAddress = EnvironmentHelper.IpAddress,
                    IpAddressRemote = EnvironmentHelper.IpAddressExternal,
                    Level = level.GetHumanReadableValue(),
                    Message = message,
                    MessageIdentifier = identifier,
                    Operation = EnvironmentHelper.Operation,
                    ProcessId = EnvironmentHelper.ProcessId,
                    UserAgent = EnvironmentHelper.UserAgent,
                    ThreadId = Environment.CurrentManagedThreadId,
                    ThreadName = Thread.CurrentThread.Name
                }.GetSerializer();
        }

        /// <summary>
        /// Propagates the internal.
        /// </summary>
        /// <param name="message">The message.</param>
        private void PropagateInternal(string message)
        {
            _queue.Enqueue(message);
        }

        /// <summary>
        /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// Sets the level.
        /// </summary>
        /// <param name="level">The level.</param>
        public void SetLevel(LogLevel level)
        {
            _level = level;
        }

        /// <summary>
        /// Fatal the specified category.
        /// </summary>
        /// <param name="category">The category.</param>
        /// <param name="message">The message.</param>
        public void Fatal(string category, string message)
        {
            if (!_level.HasFlag(LogLevel.Fatal))
            {
                return;
            }

            PropagateInternal(Serialize(LogLevel.Fatal, category, message));
        }

        /// <summary>
        /// Errors the specified category.
        /// </summary>
        /// <param name="category">The category.</param>
        /// <param name="message">The message.</param>
        public void Error(string category, string message)
        {
            if (!_level.HasFlag(LogLevel.Error))
            {
                return;
            }

            PropagateInternal(Serialize(LogLevel.Error, category, message));
        }

        /// <summary>
        /// Warnings the specified category.
        /// </summary>
        /// <param name="category">The category.</param>
        /// <param name="message">The message.</param>
        public void Warning(string category, string message)
        {
            if (!_level.HasFlag(LogLevel.Warning))
            {
                return;
            }

            PropagateInternal(Serialize(LogLevel.Warning, category, message));
        }

        /// <summary>
        /// Information the specified category.
        /// </summary>
        /// <param name="category">The category.</param>
        /// <param name="message">The message.</param>
        public void Info(string category, string message)
        {
            if (!_level.HasFlag(LogLevel.Info))
            {
                return;
            }

            PropagateInternal(Serialize(LogLevel.Info, category, message));
        }

        /// <summary>
        /// Traces the specified category.
        /// </summary>
        /// <param name="category">The category.</param>
        /// <param name="message">The message.</param>
        public void Trace(string category, string message)
        {
            if (!_level.HasFlag(LogLevel.Trace))
            {
                return;
            }

            PropagateInternal(Serialize(LogLevel.Trace, category, message));
        }

        /// <summary>
        /// Traces the specified category.
        /// </summary>
        /// <param name="category">The category.</param>
        /// <param name="message">The message.</param>
        /// <param name="exception">The exception.</param>
        public void Trace(string category, string message, Exception exception)
        {
            if (!_level.HasFlag(LogLevel.Trace))
            {
                return;
            }

            PropagateInternal(Serialize(LogLevel.Trace, category, message));

            Trace(category, exception);
        }

        /// <summary>
        /// Traces the specified category.
        /// </summary>
        /// <param name="category">The category.</param>
        /// <param name="exception">The exception.</param>
        public void Trace(string category, Exception exception)
        {
            if (!_level.HasFlag(LogLevel.Trace))
            {
                return;
            }

            do
            {
                PropagateInternal(Serialize(LogLevel.Trace, category, exception.Message));
                PropagateInternal(Serialize(LogLevel.Trace, category, exception.StackTrace));

                exception = exception.InnerException;
            } while (exception != null);
        }

        /// <summary>
        /// Debugs the specified category.
        /// </summary>
        /// <param name="category">The category.</param>
        /// <param name="message">The message.</param>
        public void Debug(string category, string message)
        {
            if (!_level.HasFlag(LogLevel.Debug))
            {
                return;
            }

            PropagateInternal(Serialize(LogLevel.Debug, category, message));
        }

        /// <summary>
        /// Debugs the specified category.
        /// </summary>
        /// <param name="category">The category.</param>
        /// <param name="content">The content.</param>
        /// <param name="identifier">The identifier.</param>
        public void Debug(string category, string content, string identifier)
        {
            if (!_level.HasFlag(LogLevel.Debug))
            {
                return;
            }

            PropagateInternal(Serialize(LogLevel.Debug, category, content, identifier));
        }

        /// <summary>
        /// Debugs the specified category.
        /// </summary>
        /// <typeparam name="T">any class that can be serialized to the <paramref name="customFormat" /> serializer format.</typeparam>
        /// <param name="category">The category.</param>
        /// <param name="content">The content.</param>
        /// <param name="identifier">The identifier.</param>
        /// <param name="customFormat">The custom format.</param>
        public void Debug<T>(
            string category,
            T content,
            string identifier,
            SerializerFormat customFormat
        )
            where T : class, new()
        {
            if (!_level.HasFlag(LogLevel.Debug))
            {
                return;
            }

            string serialized;

            if (customFormat == SerializerFormat.None)
            {
                serialized = (string)content.GetSerializer();
            }
            else
            {
                serialized = (string)content.GetCustomSerializer(customFormat);
            }

            PropagateInternal(Serialize(LogLevel.Debug, category, serialized, identifier));
        }
    }
}