Event Aggregator

Event Aggregator

Back in 2005, I wrote a lightweight event aggregator. It was nothing fancy, just a workhorse. I used that little class for a long, long time.

Recently, I needed an event aggregator again, so, I pulled out that old source code, thinking I might be able to just use it. Well, It turns out, time has not been kind to some of the design and technology choices I made, back then. (Sigh).

Now, I realize there are all sort of whiz-bang libraries out there now, for this sort of thing. I could easily just go use one of those. But, just like in 2005, I want fine grained control over how my events are distributed throughout my project(s). Because of that, I decided to fire up my compiler and started coding a new version.

It turns out, I’ve used Microsoft’s PRISM library for so long now that, every time I think about the concept of aggregating events, I seem to visualize something that look’s suspiciously like the PRISM event aggregator. That’s fine though, since I can take that tiny part of the PRISM library, and customize it to meet my individual needs. To be sure, the PRISM event aggregator has to do many things, behind the scenes, that mine won’t ever have to do. For that reason alone, I suspect mine will be much simpler than Microsoft’s. Probably not better, but at least simpler.

I’m fine with simple. Simple is good.

So here is an event aggregator library I’m in the process of creating. Let’s go look at how I put it together.

By the way, all the code for this library can be found HERE.

The NUGET package can be found HERE.

I started by defining an interface for my main abstraction. That interface looks like this:

public interface IEventAggregator
{
    TEvent GetEvent<TEvent>()
        where TEvent : EventBase;
}

This is the part that looks most like the PRISM event aggregator, to me. That’s because I’m used to dealing with this interface, so, I see no reason to try to dream up a new workflow.

One thing I am doing differently than PRISM is, my GetEvent method doesn’t define the new() restriction on the TEvent type. That’s because most of my code runs in a hosted environment – whether as a website, service, desktop, or even console application. If I had followed PRISM’s lead, and used the new() restriction, I wouldn’t then be able to take advantage of the DI container to inject services directly into my events. I want to be able to do that, so I changed that part.

I implemented the IEventAggregator interface using a class I named EventAggregator. That class looks like this:

public class EventAggregator : IEventAggregator
{
    private readonly ConcurrentDictionary<Type, EventBase> _events;
    private readonly IServiceProvider? _serviceProvider;

    public EventAggregator()
    {
        _serviceProvider = null;
        _events = new ConcurrentDictionary<Type, EventBase>();
    }

    public EventAggregator(
        IServiceProvider serviceProvider
        )
    {
        Guard.Instance().ThrowIfNull(serviceProvider, nameof(serviceProvider));
        _serviceProvider = serviceProvider;
        _events = new ConcurrentDictionary<Type, EventBase>();            
    }

    public virtual TEvent GetEvent<TEvent>() 
        where TEvent : EventBase
    {
        if (false == _events.TryGetValue(typeof(TEvent), out var ev))
        {
            if (null == _serviceProvider)
            {
                ev = Activator.CreateInstance<TEvent>();
            }
            else
            {
                ev = ActivatorUtilities.CreateInstance<TEvent>(
                    _serviceProvider
                    );
            }
            _events.TryAdd(typeof(TEvent), ev);
        }
        return (TEvent)ev;
    }
}

Notice that the class has two constructors. The first one accepts no parameters and allows me to use the EventAggregator class in environments where I don’t have a DI container, or wherever it’s easiest to simply new up an instance directly – for instance, inside a simple console application.

The next constructor accepts an argument of type IServiceProvider. That flavor of constructor is called by the DI container whenever I use EventAggregator in a hosted environment, where I have access to a DI container – for instance, inside a Blazor website.

Looking at the GetEvent method, we see that if we have a a service provider available, then we use it to create the event instance, through the call to ActivatorUtilities.CreateInstance. That call is important. It automatically makes any services registered with the service provider available to our concrete event classes, at construction time. That, in turn, means our concrete event objects can consume practically anything they require, directly from the DI container, in order to respond to our events. Anyway, that’s my plan. We’ll see how well that idea works out.

If we didn’t have access to a service provider when the EventAggregator was created, then we fall back to creating event objects using the tried and true Activate.CreateInstance method. That approach works almost as well, except it won’t support injecting arbitrary types into the events, at runtime.

However we create the event instances, they all get stored in the dictionary named _events. That way, we create each event instance once and reuse it after that.

Creating events, with the IEventAggregator service, is a simple as this:

void Example(IEventAggregator events)
{
    var @event = events.GetEvent<TestEvent>();
    // do stuff with the event here ...
}

class TestEvent : EventBase {}

We’ve already seen how GetEvent is implemented. Let’s look next at the EventBase class, which is the abstract base used for all our concrete event types.

Here is what EventBase looks like:

public abstract class EventBase 
{
    protected readonly IList<IEventSubscription> _subscriptions;
    protected readonly object _sync;

    protected EventBase()
    {
        _subscriptions = new List<IEventSubscription>();
        _sync = new object();
    }

    public virtual void Publish(
        params object[] args
        )
    {
        try
        {
            var activeSubscriptions = _subscriptions.Where(
                x => x.IsAlive
                );

            var errors = new List<Exception>();
            foreach (var sub in activeSubscriptions)
            {
                try
                {
                    sub.Invoke(args);
                }
                catch (Exception ex)
                {
                    errors.Add(ex);
                }
            }

            if (errors.Any())
            {
                throw new AggregateException(
                    message: $"Encountered one or more errors while " +
                        $"publishing event: '{GetType().Name}'. See " +
                        $"inner exceptions for more detail.",
                    innerExceptions: errors
                    );
            }
        }
        finally
        {
            var deadSubscriptions = _subscriptions.Where(
                x => !x.IsAlive
                ).ToList();

            lock (_sync)
            {
                foreach (var deadSub in deadSubscriptions)
                {
                    _subscriptions.Remove(deadSub);
                }
            }
        }
    }

    public virtual IDisposable Subscribe(
        Action<object[]> action,
        bool strongReference = false
        )
    {
        Guard.Instance().ThrowIfNull(action, nameof(action));

        var subscription = new EventSubscription(
            action,
            strongReference
            );

        lock (_sync)
        {
            _subscriptions.Add(subscription);
        }
        return subscription;
    }

    public virtual IDisposable Subscribe()
    {
        var subscription = new EventSubscription(
            (args) => OnInvoke(args),
            false
            );

        lock (_sync)
        {
            _subscriptions.Add(subscription);
        }
        return subscription;
    }

    public virtual void Unsubscribe(
        IDisposable subscription
        )
    {
        Guard.Instance().ThrowIfNull(subscription, nameof(subscription));
        subscription.Dispose();
    }

    protected virtual void OnInvoke(
        params object[] args
        )
    {
        // TODO : implement in a derived type.
    }
}

The class contains a list of IEventSubscription objects. We’ll look at that type soon. For now, just know that IEventSubscription represents a subscription for a specific event type.

The constructor creates an empty list, along with a synchronization object we’ll use, later, to keep random threads from messing that list up, at runtime.

The Publish method accepts an array of object arguments. Those arguments will, eventually, be passed to our event handlers.

The method starts by filtering out any subscriptions whose targets have been garbage collected since the subscription was created. We do that because there’s really no sense in trying to notify a zombie subscription. Don’t worry, we’ll take of zombie subscriptions here in a bit.

For each subscription we find, that is still active, we loop through and call Invoke on the target of that subscription. That, in turn, calls our concrete handler – whether that handler is an action, or the OnInvoke method. Either way, we send the notification for the event.

If, while iterating though the list of subscriptions, we encounter any errors, we gather all those up and throw a single AggregateException to the caller. That way, all subscriptions will get processed, whether errors happen, or not.

Finally, after we’ve notified any interested subscriptions, we then go back and look for any subscriptions that are no longer active (have been garbage collected). We then remove any zombies we find from the list of subscriptions, so they won’t clog up the list, over time.

The Subscribe method has two overloads. The first accepts an Action parameter, and a bool. The Action parameter is the code we’ll call whenever the event is published. The bool indicates, to the event aggregator, whether we should maintain a strong reference to the object containing the action, or not.

The second Subscribe flavor only accepts the bool parameter, for the strong reference. When a subscription is created using this method, The OnInvoke method, on the concrete event class, will be called whenever the event is published. Simply override that method, in your concrete event class, to perform work for that event type. This gives us two completely different ways of dealing with events and handlers – whether we want the logic for the handler near where we’re using the IEventAggregator object, or not.

There is also an Unsubscribe method. This method simply take the indicated subscription and disposes of it. Once that’s done, the trimming login in the Publish method will, eventually, remove the zombie subscription from the list. It isn’t required to call this method. It’s a courtesy for those times when a using block is not practical.

There is a design seam, of sorts, between the EventBase class, and the code that actually handles invoking handlers, at runtime. That design seam is the IEventSubscription interface. My thinking is, I might need to eventually support multiple types of subscriptions, so, I added the interface for better isolation. The IEventSubscription interface looks like this:

public interface IEventSubscription : IDisposable
{
    bool IsAlive { get; }

    void Invoke(
        params object[] args
        );
}

The IsAlive property is how the EventBase determine when a subscription turns into a brain eating zombie. Or, in other words, when the object associated with the subscription is garbage collected, by the .NET framework.

The Invoke method is exactly what it looks like, a method to invoke the handler action associated with the event subscription.

The default implementation of the IEventSubscription interface is the EventSubscription class. That class looks like this:

public class EventSubscription : IEventSubscription
{
    private object? _innerReference;

    public bool IsAlive
    {
        get
        {
            if (_innerReference is WeakReference)
            {
                return ((WeakReference)_innerReference).IsAlive;
            }
            else
            {
                return true;
            }
        }
    }
        
    public EventSubscription(
        Action<object[]> action,
        bool strongReference
        )
    {
        Guard.Instance().ThrowIfNull(action, nameof(action));
        if (strongReference)
        {
            _innerReference = action;
        }
        else
        {
            _innerReference = new WeakReference(
                action
                );
        }
    }

    public virtual void Invoke(
        params object[] args
        )
    {
        if (_innerReference is WeakReference)
        {
            (((WeakReference)_innerReference).Target as Action<object[]>)?.Invoke(
                args
                );
        }
        else
        {
            ((Action<object[]>?)_innerReference)?.Invoke(args);
        }                
    }

    public void Dispose()
    {
        if (_innerReference is WeakReference)
        {
            ((WeakReference)_innerReference).Target = null;
        }
        else
        {
            _innerReference = null;
        }
    }
}

In the IsAlive property, we see that we’re checking to see whether we hold a strong or weak reference to the target object. If it’s a strong reference we simply return true, since it will never get garbage collected. If it’s a weak reference, we then defer to that object’s IsAlive property.

The constructor accepts the Action delegate for the handler. The bool parameter is used to determine whether that action reference is directly held, or wrapped in a WeakReference object first, then held.

The Invoke method again checks for whether we hold a weak or strong reference to the target object. Once that’s done, the target action is invoked, passing in whatever arguments are present.

The Dispose method just removes the reference to the target of the delegate. That’ really all the cleanup we need to bother with.

The only other code in the CG.Events library is for registering the IEventAggregator type with the DI container, in a hosted environment. That code looks like this:

public static partial class ServiceCollectionExtensions
{
    public static IServiceCollection AddEventAggregation(
        this IServiceCollection serviceCollection
        )
    {
        Guard.Instance().ThrowIfNull(serviceCollection, nameof(serviceCollection));

        serviceCollection.AddSingleton<IEventAggregator, EventAggregator>();
        return serviceCollection;
    }
}

Not much going on there, just a bit of code to register the concrete EventAggregator type as a singleton service, with the DI container.

So how does one go about using this contraption of mine? Easy! Here’s a simple demo with no DI stuff to get in the way:

class Program
{
    static void Main(string[] args)
    {
        var ea = new EventAggregator();
        var ev1 = ea.GetEvent<Test1Event>();

        using (ev1.Subscribe((args) => 
        { 
            Console.WriteLine($"Event 1 handler called."); 
        }))
        {
            ev1.Publish(); 
        }
    }
}

class Test1Event : EventBase { }

Here we manually create an event aggregator. Then we use that to create our TestEvent object. Then we subscribe using a delegate that prints “Event 1 handler called.” whenever the event is published.

Finally, we publish the event using the Publish method, on the TestEvent object.

Want more? Here’s another example using a hosted environment:

class Program
{
    public static void Main(string[] args)
    {
        CreateHostBuilder(args).Build().RunDelegate((host, token) =>
        {
            var events = host.Services.GetRequiredService<IEventAggregator>();
            var @event = events.GetEvent<TestEvent>();

            @event.Subscribe((args) => 
            {
                Console.WriteLine(
                    $"action called at: {DateTime.Now}. Args: " +
                    $"'{string.Join(",", args)}'");
            });

            @event.Publish("a", "b", "c");
        });
    }
    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureServices(services =>
            {
                services.AddEventAggregation(); 
            });
}

class TestEvent : EventBase  {  }

This example creates a standard IHost object, then calls the RunDelegate method to run our little example as a simple delegate. Notice that we call the AddEventAggregation method, to register our IEventAggregator type with the DI container.

Inside the delegate for the host, we get the event aggregator from the host’s services. Then we create the event object using the event aggregator. Then we subscribe to the event. Finally, we publish for that event.

That’s really all I have for this library right now. It’s all super new code so expect it to wiggle around on the plate for a bit, before it settles down. Eventually, I may come back and add things like the ability to process events in a background thread. I left that out for now.

I deliberately left out Windows synchronization since, in my opinion, that’s really outside the scope of an event aggregator anyway. If you want to use this tool in a WinForms or WPF project, you’ll need to address synchronization issues on your own. If you’re a WinForms or WPF programmer, synchronization should probably be second nature to you, anyway.

Finally, no doubt the source for this library has changed some since I wrote this article. After all, the whole concept is still a work in progress. If you want to use the tool then, hopefully, you’ll bear with any changes I make.

Hope you enjoyed the article. Hope you enjoy the event aggregator.

Photo by Anthony DELANOIX on Unsplash