Message Bus – Part 3

In my last article I covered a Rabbit MQ extension to my IBus library. This time I thought I might cover the same kind of extension except for the MSMQ messaging system. Since this extension is so much like the last one I may skim over some of the code that’s very similar to my last article and just focus on the bits of code that are remarkably different. That should make this article go a little faster than the last one and avoid too much duplication.

Just like last time, we’ll need custom implementations of the IBusProvider, IBusSetup and IBusStrategy types. Let’s start with our strategy class. Here is a look at the simplified source for that:

 

The MicrosoftBusStrategy class derives from BusStrategyBase, which is a part of my builder library. The class contains a single property:

  1. Queue, which contains a MessageQueue object from the MSMQ client library.

The class has a single constructor that accepts a two parameters, product and provider. These parameter values are supplied by the builder when it creates the provider instance, at runtime. The constructor gets the queue name from the Setup property, which is a property on the BusStrategyBase class. It then uses that information to check to see if the queue already exists within MSMQ. If not, then the queue is created. After that, an instance of MessageQueue is created and tied to the specified queue name.

The next method is called SendAsync, and is called by the BusProduct to send a message through the message bus. The first thing we do is call the ToJson extension method to convert the object to a JSON representation. ToJson is part of my CG.Json library and is simply a thing wrapper around the NewtonSoft JSON library. Once we have the JSON send it through the message bus using the Send message on the MessageQueue object.

The next method is called ReceiveAsync, and is also called by the BusProduct to receive a message through the message bus. The first thing we do is get the queue name from the Setup and then call the Receive method on the MessageQueue object to fetch any messages that might be waiting in the underlying message queue. If the Receive method returns NULL we simply exist. On the other hand, if we get something back from the message queue then we specify a custom message formatter object and then return the value of the Body property, on the message. When we read the Body property, it triggers the MSMQ library to call our custom JsonMessageFormatter class to convert the message body from JSON, back into the original object. We’ll look at the JsonMessageFormatter class in closer detail below.

The only other method on the strategy is the Dispose method. Here we have a few lines of code to cleanup the connection and model objects that were created in the constructor.

Here is the simplified code for the JsonMessageFormatter class, which is used within the MicrosoftBusStrategy to convert messages back into their original object type:

 

The JsonMessageFormatter class must implement the ICloneable and IMessageFormatter interfaces, in order to properly function with the MSMQ client library.

The first method on the class is called CanRead and is used to determine if MSMQ should allow a read of the body of a message. We start be getting the stream for the message body, which we then perform some checks on, in order to determine if the stream can be read from.

The next method is named Clone and is used to obtain a copy of the message formatter. All we do in this message is create an additional instance of JsonMessageFormatter and return it.

The next method is named Read, and is used to read the contents of a message. Here we first open a stream reader on the message body’s stream, using UTF8 encoding, since we know we’re going to read JSON. Afer that we read to the end of the stream and put the results into a string named json. Then we use a regular expression to strip out any parts of the JSON string that don’t represents the actual body of the message. From there, we take the parsed value, which is still just JSON, and run it through the FromJson extension method to convert it back into an object reference.

The last method is named Write and is used to write an object out as JSON and embed that JSON into the message body. Here we convert the object to JSON using the ToJson extension method, then we grab the bytes for that string using the UTF8 encoding scheme. From there, we wrap the bytes into a memory string and pass that reference into the message using the BodyStream property.

Generally speaking, strategy objects are created through a provider object. In our case we create MicrosoftBusStrategy objects through a MicrosoftBusStrategyProvider instance. Here is a look at the simplified source for that class:

 

Not a lot going on here. We simply create a new MicrosoftBusStrategy object and return it.

The next abstraction we should cover is the IMicrosoftBusStrategySetup interface, which is the type that we’ll return to the caller. This type has methods to allow a caller to configure the strategy before the builder tries to create an instance. Here is a look at the simplified source for that interface:

 

There is only 1 property on this setup: QueueName, which is just that, the name of the messaging queue we want to work with.

The interface uses set/get methods, rather than properties, so that the methods can be chained, fluent style, and also so we can override the parameters, if needed.

The MicrosoftBusStrategySetup interface is implemented by the MicrosoftBusStrategySetup. Here is the simplified code for that class:

 

The MicrosoftBusStrategySetup class derives from the BusStrategySetupBase class, which is part of my builder library. Is also implements the IMicrosoftBusStrategySetup interface, which we just looked at. I won’t go over into detail on this listing since it’s pretty trivial. We simply set or get the QueueName property on the setup.

As usual, I tie the various abstractions together and integrate them with the BusBuilder through the use of a simple extension method. Here is the simplified code for that:

 

In this method we create a setup and add it to the builder’s Setup collection, then we return the setup reference so that the caller can handle any configuration steps that are required by the strategy.

So, how would we use this in a project? Let’s look at that now:

 

In my example we create a builder, use our extension method to inject our custom MicrosoftBusStrategySetup instance into that builder, then we call the method to set the queue name on the resulting setup object. Finally, we call the Build method on the builder to actually create the IBus object.

I hope this article proves useful to someone out there. If nothing else, I hope it inspires someone to tackle their own IBus implementation. Thanks for reading!

The code for this article is part of my NUGET package CG.Bus.Microsoft, which can be downloaded for free at https://github.com/CodeGator/CG.Bus.Microsoft

 

The source code for the CG.Bus.Microsoft project lives on Github and can be obtained for free at

https://www.nuget.org/packages/CG.Bus.Microsoft