Implement an event-driven microservice architecture on Kubernetes with RabbitMQ using MassTransit Part — 2

Darshana Dinushal
7 min readSep 18, 2021

Event-driven microservice architecture, services communicate each-other using event messages. When business events occur, producers publish them with messages. At the same time, other services consume them through event listeners. The main benefits of event-driven systems are asynchronous behavior and loosely coupled structures.

In this article, we are going to talk about a simple publisher/subscriber scenario deployed on openshift cluster. To impalement publisher and subscriber we use MassTransit.

Prerequisites & Setup

Basic knowledge on Kubernetes architecture and its components (Link) ,also you need to install Kubernetes cluster. If your using OpenShift Container Platform you should have basic knowledge on OpenShift and (Provision Red Hat OpenShift Cluster On AWS) or you can use AWS ROSA or KMS.

Install RabbitMQ cluster on Kubernetes or Openshift — To setup RabbitMQ cluster read my previous article Deploying RabbitMQ Cluster On Kubernetes — Part 1 or you can used AWS AmazonMQ RabbitMq broker (Link), it will provision RabbitMq broker quickly for you.

An also you should have a basic understanding of RabbitMQ, ASP.NET Core WebAPI, and .NET Core.

Source code

You can download the source code from my Git Repo.

Use Case

Assume that we have an application, Using that application user can create appointment after successfully create appointment publisher will publish email message to notify other parties, then consumer consume that message and trigger the email to the end-user.

If the consumer consumes the message if there is a failure or exception occur system should be able to re-try and if the re-try unsuccessful then move the message to the Error queue.

MassTransit

MassTransit is free/ open-source lightweight software .NET-based Enterprise Service Bus (ESB).MassTransit helps Microsoft developers route messages over MSMQ, RabbitMQ and ActiveMQ service busses, with native support for MSMQ and RabbitMQ.

What does MassTransit add on top of RabbitMQ?

Threaded Consumers Multiple concurrent consumers possible, It will handle by the MassTransit, and also asynchronous message (The message pipeline in MassTransit is asynchronous)

MassTransit is free/ open-source lightweight software .NET-based Enterprise Service Bus (ESB).MassTransit helps Microsoft developers route messages over MSMQ, RabbitMQ and ActiveMQ service busses, with native support for MSMQ and RabbitMQ.

What does MassTransit add on top of RabbitMQ?

Threaded Consumers Multiple concurrent consumers possible, It will handle by the MassTransit, and also asynchronous message (The message pipeline in MassTransit is asynchronous).

Exception Management MassTransit implement some level of generic exception handling for consumers, then those messages moved to error queue and later we can inspect the message and re-queue it.

Retries & Poison Messages If consumer throw an exception, MassTransit uses a retry policy to redeliver the message to the consumer. If the retries are comes to the max-retry count due to continued failures or other reasons, MassTransit moves the message to an error queue.

Transactions MassTransit manage the transaction and also it will manage the database transition also it also support Entity Framework integrations.

Serialization MassTransit provides a number of serializes, including BSON, JSON, XML and Binary.

Routing MassTransit provides a heavily production tested convention for using RabbitMQ exchanges to route published messages to the subscribed consumers.

Unit Testability MassTransit provide TestFramework NuGet package.

Tracing and Monitoring Using the tracing functionality, you can get very detailed timings of when and where things were consumed, how long the receive took, how long the consume took and what exceptions were thrown.

Sagas are initiated by an event, sagas orchestrate events, and maintain the state of the overall transaction. Sagas are designed to manage the complexity of a distributed transaction without locking and immediate consistency. They manage state and track any compensations required if a partial failure occurs.

Publisher Application Implementation

1.) Create Asp.net core 3.1 web Api application using visual studio.

In here I am not going to describe in details how to create .net core application. You can download the source code in my GitHub repository as I mention above.

After create application install the following Nuget Packages.

2.) Add RabbitMQ connection string to appsettings.json.

We can provide connection string in two different ways as follow.

3.) Register the RabbitMQ and MassTransit dependencies.

Here I register all dependencies as extensions inside the ConfigureServices method in the Startup.cs. In here we can provide ExchangeType (Direct, Topic, Fanout, and Header exchanges).It is basically a routing rule for the messages. Messages are not published directly to a queue; instead, the producer sends messages to an exchange..

4.) Create ProducerService class to publish message.

A publish endpoint lets the underlying transport determine the actual endpoint to which the message is sent. For example, an exchange on RabbitMQ.

5.) Create Controller class Post method to expose the publisher service to the outside

6.) Add a docker file using visual studio Docker Support(see the image) and build and publish the image to DockerHub or any Docker Image repository.

Build Docker Image

docker build -t <dockerhubId>/rabbitmq-publisher:1.0.0 -f microservices/Sample.Application.Publisher/Dockerfile .

Docker push

docker push <dockerhubId>/rabbitmq-publisher:1.0.0

7.) Deploy Publisher application to the Kubernetes or OpenShift.

If you are using Kubernetes cluster ,you can deploy this image using kubectl command. If you are using OpenShift you can S2i (source-to-image ).

Consumer Application Implementation

1.) Create Asp.net core 3.1 web Api application and add Nuget packages using visual studio.

Delete UseRouting , UseAuthorization and UseEndpoints Configure method in Startup.cs class.

After create application add following Nuget Packages.

2.) Add RabbitMQ connection string and Email credentials to appsettings.json.

3.) Register the RabbitMQ and MassTransit dependencies.

MassTransit Middleware

MassTransit is built on top of Green Pipes, which is used to create a network of pipes and filters to dispatch messages. A pipe is composed of a series of filters for the detailed view of MassTransit’s Receive Pipeline read this document .

4.) RabbitMQ Consumer Message Filter

The scoped filters is transferring data between the consumer. This data may be extracted from headers, or could include context or authorization information that needs to be passed from a consumed message context to sent or published messages.

In here we will validate the message object before consume. The “RegexUtilities” class IsValidateEmail method validate the email address, if email is in-valid it will throw an exception.

MassTransit Retry

Some exceptions may be caused by a transient condition, such as a database deadlock, a busy web service, or some similar type of situation which usually clears up on a second attempt. With these exception types, it is often desirable to retry the message delivery to the consumer, allowing the consumer to try the operation again.

When configuring message retry, there are several retry policies available, including:

Exception Filters

Sometimes you do not want to always retry, but instead only retry when some specific exception is thrown and fault for all other exceptions. To implement this, you can use an exception filter. Specify exception types using either the Handle or Ignore method. A filter can have either Handle or Ignore statements, combining them has unpredictable effects.

5.) RabbitMQ Consumer ClaimSubmition.

In here we configure the RetryCount (3) and Interval (5000) in the appsettings.json file , that value we pass to the Interval method. It will retry three times every 5 seconds. After retry fails that message will move to the Error Queue.

6.) RabbitMQ Consumer class.

This is the consumer class ,when everything successfully execute in the MassTransit middleware ,If there is a no exception happen before finally it will execute the consume method. In here it will trigger the SendMail method and send the email to the end user.

** Download the source code and customize according to your scope. Github repository mention above

7.) Add a docker file using visual studio Docker Support and build and publish the image to DockerHub or any Docker Image repository.

Build Docker Image and Docker push

docker build -t <dockerhubId>/rabbitmq-consumer:1.0.0 -f microservices/Sample.Application.Consumer/Dockerfile .

docker push <dockerhubId>/rabbitmq-consumer:1.0.0

Execute Publisher and Consumer

Once consumer application start execution, MassTransit will create queue and exchange for you. If there any error or exception occur MassTransit will create error queue to manage error messages as well.

Conclusion

In this article, We have implement simple .Net core publisher/subscriber application using MassTransit deploy on Kubernetes (openshift cluster).

Enjoy!!! stay safe

--

--

Darshana Dinushal

Assistant Lead Engineer ( C# .net Core, Angular + Azure DevOps +AWS+ Kubernetes + Docker + microservices )