Message Bus – Part 2

In my last article I covered the overall architecture for a very simple messaging component that lives in my NUGET package named CG.Bus. That package exposes a type named IBus, which can be used to perform very simple send/receive operations in a technology agnostic fashion.

This time I thought I might build on my existing architecture to create extensions for Rabbit MQ, which is a popular messaging systems. In order to integrate with Rabbit MQ we’ll need a custom implementation of the IBusProvider, IBusSetup and IBusStrategy types that we looked at in the previous article. Let’s start with our strategy class. Here is a look at the simplified source for that:

 

The RabbitBusStrategy class derives from BusStrategyBase, which is a part of my builder library. The class contains two properties:

  1. Model, which contains an IModel object from the Rabbit MQ client library. I’m not a Rabbit MQ expert but I gather that the IModel object is supposed to represent a Rabbit MQ queue.
  2. Connection, which contains an IConnection object from the Rabbit MQ client library. This object represents the connection to the underlying Rabbit MQ messaging system.

The class has a single constructor that accepts a two parameters, product and provider. These parameter values are supplied by the builder when it creates the provider instance, at runtime. The constructor gets the host name from the Setup property, which is a property on the BusStrategyBase class. It then uses that setup object to get the host name, which will be needed in order to create an instance of the ConnectionFactory object. That object is how the Rabbit MQ client library creates a new connection object. The connection object is then created and stored in the Connection property. After a connection exists, we use it to create a model instance and that object is stored in the Model property. Finally, we call QueueDeclare, on the Model object, to tell Rabbit MQ which queue we are interested in working with.

The next method is called SendAsync, and is called by the BusProduct to send a message through the message bus. The first thing we do is call the ToJson extension method to convert the object to a JSON representation. ToJson is part of my CG.Json library and is simply a thing wrapper around the NewtonSoft JSON library. Once we have the JSON we convert it to a UTF8 byte array and publish that data to the underlying messaging system through the Model object, using the BasicPublish method.

The next method is called ReceiveAsync, and is also called by the BusProduct to receive a message through the message bus. The first thing we do is get the queue name from the Setup and then call the BasicGet method on the Model object to fetch any message that’s waiting in the underlying message queue. If the BasicGet method returns NULL we simply exist. On the other hand, if we get something back from the message queue then we convert the data back into a JSON string and then call the FromJson method to convert the JSON back into an object reference. FromJson is another part of my CG.Json library, and again, is simply a thing layer around the NewtonSoft JSON library. Once we’ve converted the JSON back to an object, we return that value from the method.

The only other method on the strategy is the Dispose method. Here we have a few lines of code to cleanup the connection and model objects that were created in the constructor.

Generally speaking, strategy objects are created through a provider object. In our case we created the RabbitBusStrategy objects through a RabbitBusStrategyProvider instance. Here is a look at the simplified source for that class:

 

Not a lot going on here. We simply create a new RabbitBusStrategy object and return it.

The next abstraction we should cover is the IRabbitBusStrategySetup interface, which is the type that we’ll return to the caller. This type has methods to allow a caller to configure the strategy before the builder tries to create an instance. Here is a look at the simplified source for that interface:

 

There are 3 properties on this setup: (1) TopicName, which is something I’m not yet using but I hope to one day support a publish/subscribe approach with. (2) HostName, which is just the name of the underling messaging system’s host. And finally, (3) the QueueName, which is just that, the name of the messaging queue we want to work with.

The interface uses set/get methods, rather than properties, so that the methods can be chained, fluent style, and also so we can override the parameters, if needed.

The IRabbitBusStrategySetup interface is implemented by the RabbitBusStrategySetup. Here is the simplified code for that class:

 

The RabbitBusStrategySetup class derives from the BusStrategySetupBase class, which is part of my builder library. Is also implements the IRabbitBusStrategySetup interface, which we just looked at. I won’t go over into detail on this listing since each method is so trivial. In each case though, the methods either set or get a property on the setup.

As usual, I tie the various abstractions together and integrate them with the BusBuilder through the use of a simple extension method. Here is the simplified code for that:

 

In this method we create a setup and add it to the builder’s Setup collection, then we return the setup reference so that the caller can handle any configuration steps that are required by the strategy.

So, how would we use this in a project? Let’s look at that now:

 

So, not too complicated. We create a builder, use our extension method to inject our custom RabbitBusStrategySetup instance into that builder, then we call a couple of methods on the resulting setup object to configure the host name and the queue name. Finally, we call the Build method on the builder to actually create the IBus object.

I hope this article proves useful to someone out there. If nothing else, I hope it inspires someone to tackle their own IBus implementation for Rabbit MQ.

Next time I’ll cover another implementation, one that integrates with MSMQ. See ya then!

The code for this article is part of my NUGET package CG.Bus.Rabbit, which can be downloaded for free at https://github.com/CodeGator/CG.Bus.Rabbit

 

The source code for the CG.Bus.Rabbit project lives on Github and can be obtained for free at

https://www.nuget.org/packages/CG.Bus.Rabbit

 

 

Photo by Phil Becker on Unsplash