CaffGeek/MBACNationals

View on GitHub
Edument.CQRS/MessageDispatcher.cs

Summary

Maintainability
A
1 hr
Test Coverage
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections;
using System.Reflection;

namespace Edument.CQRS
{
    /// <summary>
    /// This implements a basic message dispatcher, driving the overall command handling
    /// and event application/distribution process. It is suitable for a simple, single
    /// node application that can safely build its subscriber list at startup and keep
    /// it in memory. Depends on some kind of event storage mechanism.
    /// </summary>
    public class MessageDispatcher
    {
        private Dictionary<Type, Action<object>> commandHandlers = new Dictionary<Type, Action<object>>();      
        private Dictionary<Type, List<Action<object>>> eventSubscribers = new Dictionary<Type, List<Action<object>>>();
        private IEventStore eventStore;

        private Object eventPubLock = new Object();
        public bool IsRegenerating { get; private set; }

        /// <summary>
        /// Initializes a message dispatcher, which will use the specified event store
        /// implementation.
        /// </summary>
        /// <param name="es"></param>
        public MessageDispatcher(IEventStore es)
        {
            eventStore = es;
        }

        /// <summary>
        /// Tries to send the specified command to its handler. Throws an exception
        /// if there is no handler registered for the command.
        /// </summary>
        /// <typeparam name="TCommand"></typeparam>
        /// <param name="c"></param>
        public void SendCommand<TCommand>(TCommand c)
        {
            if (commandHandlers.ContainsKey(typeof(TCommand)))
                commandHandlers[typeof(TCommand)](c);
            else
                throw new Exception("No command handler registered for " + typeof(TCommand).Name);
        }

        /// <summary>
        /// Publishes the specified event to all of its subscribers.
        /// </summary>
        /// <param name="e"></param>
        private void PublishEvent(object e)
        {
            var eventType = e.GetType();
            if (eventSubscribers.ContainsKey(eventType))
                lock (eventPubLock)
                    foreach (var sub in eventSubscribers[eventType])
                    {
                        sub(e);
                        dynamic target = sub.Target;
                        var y = target.subscriber as IReadModel;
                        if (y != null) y.Save();
                    }
        }

        private void PublishEventWithoutSave(object e)
        {
            var eventType = e.GetType();
            if (eventSubscribers.ContainsKey(eventType))
                lock (eventPubLock)
                    foreach (var sub in eventSubscribers[eventType])
                        sub(e);
        }

        private void PublishEventToReadModel(object e, string readmodel)
        {
            var eventType = e.GetType();

            if (eventSubscribers.ContainsKey(eventType))
                lock (eventPubLock)
                    foreach (var sub in eventSubscribers[eventType])
                    {
                        dynamic target = sub.Target;
                        if ((target.subscriber.GetType().Name as string).Equals(readmodel, StringComparison.OrdinalIgnoreCase))
                            sub(e);
                    }
        }

        /// <summary>
        /// Adds an object that handles the specified command, by virtue of implementing
        /// the IHandleCommand interface.
        /// </summary>
        /// <typeparam name="TCommand"></typeparam>
        /// <typeparam name="TAggregate"></typeparam>
        /// <param name="handler"></param>
        public void AddHandlerFor<TCommand, TAggregate>(IHandleCommand<TCommand, TAggregate> handler)
            where TAggregate : Aggregate, new()
        {
            if (commandHandlers.ContainsKey(typeof(TCommand)))
                throw new Exception("Command handler already registered for " + typeof(TCommand).Name);
            
            commandHandlers.Add(typeof(TCommand), c =>
                {
                    // Create up an empty aggregate.
                    var agg = new TAggregate();

                    // The aggregate loader function, when given an ID, loads and applies
                    // all of the events. It stashes the ID to indicate we loaded up the
                    // aggregate (and thus have to pay attention to version when we save
                    // back the events). Note that in the case where the aggregate's current
                    // state was never looked at, it doesn't matter when we apply the
                    // event; it has no dependency on the past.
                    Func<Guid, TAggregate> al = id =>
                        {
                            agg.Id = id;
                            agg.ApplyEvents(eventStore.LoadEventsFor<TAggregate>(id));
                            return agg;
                        };
                    
                    // With everything set up, we invoke the command handler, collecting the
                    // events that it produces.
                    var resultEvents = new ArrayList();
                    foreach (var e in handler.Handle(al, (TCommand)c))
                        resultEvents.Add(e);
                    
                    // Store the events in the event store.
                    eventStore.SaveEventsFor<TAggregate>(
                        agg.Id,
                        agg.EventsLoaded,
                        resultEvents);

                    // Publish them to all subscribers.
                    foreach (var e in resultEvents)
                        PublishEvent(e);
                });
        }

        /// <summary>
        /// Adds an object that subscribes to the specified event, by virtue of implementing
        /// the ISubscribeTo interface.
        /// </summary>
        /// <typeparam name="TEvent"></typeparam>
        /// <param name="subscriber"></param>
        public void AddSubscriberFor<TEvent>(ISubscribeTo<TEvent> subscriber)
        {
            if (!eventSubscribers.ContainsKey(typeof(TEvent)))
            {
                eventSubscribers.Add(typeof(TEvent), new List<Action<object>>());
            }

            eventSubscribers[typeof(TEvent)].Add(e => subscriber.Handle((TEvent)e));
        }

        /// <summary>
        /// Looks thorugh the specified assembly for all public types that implement
        /// the IHandleCommand or ISubscribeTo generic interfaces. Registers each of
        /// the implementations as a command handler or event subscriber.
        /// </summary>
        /// <param name="ass"></param>
        public void ScanAssembly(Assembly ass)
        {
            // Scan for and register handlers.
            var handlers = 
                from t in ass.GetTypes()
                from i in t.GetInterfaces()
                where i.IsGenericType
                where i.GetGenericTypeDefinition() == typeof(IHandleCommand<,>)
                let args = i.GetGenericArguments()
                select new
                {
                    Type = t,
                    CommandType = args[0],
                    AggregateType = args[1]
                };
            foreach (var h in handlers)
                this.GetType().GetMethod("AddHandlerFor") 
                    .MakeGenericMethod(h.CommandType, h.AggregateType)
                    .Invoke(this, new object[] { CreateInstanceOf(h.Type) });

            // Scan for and register subscribers.
            var subscriber =
                from t in ass.GetTypes()
                from i in t.GetInterfaces()
                where i.IsGenericType
                where i.GetGenericTypeDefinition() == typeof(ISubscribeTo<>)
                select new
                {
                    Type = t,
                    EventType = i.GetGenericArguments()[0]
                };
            foreach (var s in subscriber)
                this.GetType().GetMethod("AddSubscriberFor")
                    .MakeGenericMethod(s.EventType)
                    .Invoke(this, new object[] { CreateInstanceOf(s.Type) });
        }

        /// <summary>
        /// Looks at the specified object instance, examples what commands it handles
        /// or events it subscribes to, and registers it as a receiver/subscriber.
        /// </summary>
        /// <param name="instance"></param>
        public void ScanInstance(object instance)
        {
            // Scan for and register handlers.
            var handlers =
                from i in instance.GetType().GetInterfaces()
                where i.IsGenericType
                where i.GetGenericTypeDefinition() == typeof(IHandleCommand<,>)
                let args = i.GetGenericArguments()
                select new
                {
                    CommandType = args[0],
                    AggregateType = args[1]
                };
            foreach (var h in handlers)
                this.GetType().GetMethod("AddHandlerFor")
                    .MakeGenericMethod(h.CommandType, h.AggregateType)
                    .Invoke(this, new object[] { instance });

            // Scan for and register subscribers.
            var subscriber =
                from i in instance.GetType().GetInterfaces()
                where i.IsGenericType
                where i.GetGenericTypeDefinition() == typeof(ISubscribeTo<>)
                select i.GetGenericArguments()[0];
            foreach (var s in subscriber)
                this.GetType().GetMethod("AddSubscriberFor")
                    .MakeGenericMethod(s)
                    .Invoke(this, new object[] { instance });
        }

        /// <summary>
        /// Creates an instance of the specified type. If you are using some kind
        /// of DI container, and want to use it to create instances of the handler
        /// or subscriber, you can plug it in here.
        /// </summary>
        /// <param name="t"></param>
        /// <returns></returns>
        private object CreateInstanceOf(Type t)
        {
            return Activator.CreateInstance(t);
        }

        public void RepublishEvents()
        {
            foreach (var e in eventStore.LoadAllEvents())
                PublishEvent(e);
        }

        public void RegenerateReadModel(string readmodel)
        {
            this.IsRegenerating = true;
            foreach (var e in eventStore.LoadAllEvents())
                PublishEventToReadModel(e, readmodel);
            this.IsRegenerating = false;
        }

        public void RegenerateReadModels()
        {
            this.IsRegenerating = true;
            foreach (var e in eventStore.LoadAllEvents())
                PublishEventWithoutSave(e);
            this.IsRegenerating = false;
        }

        public TAggregate Load<TAggregate>(Guid id)
            where TAggregate : Aggregate, new()
        {
            var agg = new TAggregate();
            agg.Id = id;
            agg.ApplyEvents(eventStore.LoadEventsFor<TAggregate>(id));
            return agg;
        }
    }
}