Coding, Tech and Developers Blog
Messaging solutions are pretty hip lately. Whether you are stuck in an entangled microservices domain, maybe even just a distributed monolith or you genuinely want to get rid of reliability issues and tight coupling in your application, you will eventually have come across messaging as a possible solution.
So let's take a look at how you can integrate a messaging strategy into your ASP.NET project, or any other .NET application for that matter, by using MassTransit and the messaging infrastructure provided by AWS. I am actually not a real fan of AWS but since I recently had to integrate it during work, I thought I might share with you some of the head scratches it gave me.
Depending on your specific domain or project, there can be various reasons why you might consider integrating messaging strategies into your solution. Here are a few examples:
MassTransit can be thought of as a .NET wrapper for various bus providers out there, such as popular RabbitMQ, Azure Service Bus, and Amazon SQS. As such, it is similar to the commercial solution NServiceBus](https://particular.net/nservicebus). While I'll be demonstrating the integration with SQS, the focus of this post will be on MassTransit and many of the concepts will apply independently of what underlying technology you'll want to use to actually route the messages. As a bonus, if you follow along closely, you don't even need an AWS account.
We'll start from scratch by creating two independent WebApi projects, serving as a proxy for two distributed applications or services. We are running on .NET 7, and MassTransit 8.0.9 for this.
First, we'll create two ASP.NET Core WebApi projects WebApi.Producer
and WebApi.Consumer
.
Second, we need to have a shared contract library, where the messages will later reside, Messaging.Contract
and another class library Messaging.Core
where we will put shared implementations in order not to duplicate too much code. This is also where we'll start right now.
In Messaging.Core
create ServiceCollectionExtension.cs
:
public static class ServiceCollectionExtensions
{
public static IServiceCollection ConfigureMassTransit(this IServiceCollection services)
{
services.AddMassTransit(busConfigurator =>
{
var executingAssembly = Assembly.GetEntryAssembly();
busConfigurator.AddConsumers(executingAssembly);
busConfigurator.UsingAmazonSqs((context, config) =>
{
config.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.AccessKey("test");
h.SecretKey("test");
h.Config(new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://localhost:4566" });
h.Config(new AmazonSQSConfig { ServiceURL = "http://localhost:4566" });
});
config.ConfigureEndpoints(context);
});
});
return services;
}
}
Please note that you'll have to have some additional NuGet packages to make this work, specifically MassTransit
, MassTransit.AmazonSQS
, and Microsoft.AspNetCore.Http.Abstractions
.
There are a couple of things worth noting here:
If you're not into distributed applications and just want to use the advantages of messaging in your single-node application, you can also configure MassTransit to be completely in-memory: busConfigurator.UsingInMemory();
To emulate our first producer, we'll create a message in Messaging.Contract
like this:
public sealed record HeartBeat
{
public DateTimeOffset Timestamp { get; init; } = DateTimeOffset.UtcNow;
public Guid Identifier { get; init; } = Guid.NewGuid();
}
For MassTransit to be able to serialize and deserialize the messages on both sides, the types and namespaces must exactly match. You could share this specific project between services via NuGet packages or code generation tools from a different specification language, such as Protobuf.
It is good practice to prepare your messages with an identifier and a timestamp. While you'll receive additional header information when consuming messages, this will still be useful as correlation information.
Next, we will set up a simple message producer, that will regularly send heartbeat messages to anyone who is interested in WebApi.Producer
:
public class HeartBeatSender : IHostedService
{
private readonly IBus messageBus;
private readonly ILogger<HeartBeatSender> logger;
public HeartBeatSender(IBus messageBus, ILogger<HeartBeatSender> logger)
{
this.messageBus = messageBus;
this.logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
while (cancellationToken.IsCancellationRequested == false)
{
try
{
var heartBeat = new HeartBeat();
await messageBus.Publish(new HeartBeat(), cancellationToken);
this.logger.LogInformation("Sent heartbeat {Timestamp}", heartBeat.Timestamp);
await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken);
}
catch (TaskCanceledException)
{
}
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
My Program.cs
now looks something like this:
var builder = WebApplication.CreateBuilder(args);
builder.Logging.AddConsole();
builder.Services.ConfigureMassTransit();
builder.Services.AddHostedService<HeartBeatSender>();
var app = builder.Build();
app.MapGet("/", () => "Hello World!");
app.Run();
Assuming you have docker installed on your machine, place the following docker-compose.yaml
file somewhere on your machine and execute docker compose up -d
:
version: "3.8"
services:
localstack:
container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}"
image: localstack/localstack
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
environment:
- DEBUG=${DEBUG-}
- LAMBDA_EXECUTOR=${LAMBDA_EXECUTOR-}
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
We are finally ready to run WebApi.Producer
. Once you do this, you will hopefully see the heartbeats in your console output:
info: MassTransit[0]
Bus started: amazonsqs://localhost/
info: WebApi.Producer.HeartBeatSender[0]
Sent heartbeat 12/17/2022 12:38:42 +00:00
info: WebApi.Producer.HeartBeatSender[0]
Sent heartbeat 12/17/2022 12:38:50 +00:00
Next up, we will create a consumer class in WebApi.Consumer
in order to receive those messages:
public class HeartBeatConsumer : IConsumer<HeartBeat>
{
private readonly ILogger<HeartBeatConsumer> logger;
public HeartBeatConsumer(ILogger<HeartBeatConsumer> logger)
{
this.logger = logger;
}
public Task Consume(ConsumeContext<HeartBeat> context)
{
logger.LogInformation("Received heartbeat with ID {Id} and timestamp {Timestamp}", context.Message.Identifier, context.Message.Timestamp);
return Task.CompletedTask;
}
}
and the respective Program.cs
:
var builder = WebApplication.CreateBuilder(args);
builder.Logging.AddConsole();
builder.Services.ConfigureMassTransit();
var app = builder.Build();
app.MapGet("/", () => "Hello World!");
app.Run();
And that's already it. Running both applications should now give you the following console output:
info: MassTransit[0]
Configured endpoint HeartBeat, Consumer: WebApi.Consumer.HeartBeatConsumer
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Development
info: MassTransit[0]
Bus started: amazonsqs://localhost/
info: WebApi.Consumer.HeartBeatConsumer[0]
Received heartbeat with ID 839aa463-6e6d-4a06-b731-737697467191 and timestamp 12/17/2022 12:44:25 +00:00
info: WebApi.Consumer.HeartBeatConsumer[0]
Received heartbeat with ID 417e1542-2001-47b5-957a-a375b6cf703c and timestamp 12/17/2022 12:44:30 +00:00
Done. Finished. We can actually stop here and be happy, and that's probably what most Hello-World tutorials will do. But as we all now, the devil is in the detail, the fun starts when project reality kicks in.
So let's go a bit further than that.
A heartbeat or similar events like PersonCreated
, CoffeeOrdered
are classic PubSub. That means that they essentially model a one-to-many relationship.
By default, MassTransit, therefore, creates a Standard
topic in SNS and a Standard
endpoint queue in SQS with a respective subscription to the topic for us.
These standard types do not guarantee proper message ordering and are delivered to anyone who is interested. You'll want to use these types if your producing application does not care about what someone is doing with the information provided in the message.
But what if you do? What if you actually expect an application to take care of your message (or better: command)?
The means to do this are FIFO
topics and queues. They provide the same features as Standard
with the benefits of strict message ordering and exactly once delivery
.
In order to create topics and queues with these attributes, e.g., to be able to use commands in your application, you'll have to append .fifo
to your topics and queue names.
Right. I don't find that very API-friendly either.
In any case, an easy way to do this is to implement a custom topic name and endpoint name formatter.
To make it not too complicated, we will delegate most of the calls to the default implementation:
public class CustomFormatter : IEndpointNameFormatter, IEntityNameFormatter
{
private readonly IEndpointNameFormatter defaultFormatter;
public CustomFormatter(IEndpointNameFormatter defaultFormatter)
{
this.defaultFormatter = defaultFormatter;
}
public string TemporaryEndpoint(string tag)
{
return defaultFormatter.TemporaryEndpoint(tag);
}
public string Consumer<T>() where T : class, IConsumer
{
var defaultName = defaultFormatter.Consumer<T>();
// Please give this a bit more thought. This is just to make a point.
var type = typeof(T).GetInterfaces().First().GenericTypeArguments.First();
if (type.Namespace.Contains("command", StringComparison.OrdinalIgnoreCase))
{
return defaultName + ".fifo";
}
return defaultName;
}
public string FormatEntityName<T>()
{
var type = typeof(T);
if (type.Namespace.Contains("command", StringComparison.OrdinalIgnoreCase))
{
return type.Name + ".fifo";
}
return type.Name;
}
public string Message<T>() where T : class
{
return defaultFormatter.Message<T>();
}
public string Saga<T>() where T : class, ISaga
{
return defaultFormatter.Saga<T>();
}
public string ExecuteActivity<T, TArguments>() where T : class, IExecuteActivity<TArguments> where TArguments : class
{
return defaultFormatter.ExecuteActivity<T, TArguments>();
}
public string CompensateActivity<T, TLog>() where T : class, ICompensateActivity<TLog> where TLog : class
{
return defaultFormatter.CompensateActivity<T, TLog>();
}
public string SanitizeName(string name)
{
return name;
}
public string Separator { get; }
}
Note that you will need some way to discriminate between events and commands. In this case, it is done by examining the namespace of the respective type. But you might also consider letting your messages implement corresponding interfaces and checking this here.
The setup method for MassTransit needs to be adapted as well:
var formatter = new CustomFormatter(new DefaultEndpointNameFormatter(false));
config.MessageTopology.SetEntityNameFormatter(formatter);
config.ConfigureEndpoints(context, formatter);
Also, let's add sending an actual command to our heartbeat sender:
namespace Messaging.Contract.Commands;
public record TellMeYourAge
{
public DateTimeOffset Timestamp { get; init; } = DateTimeOffset.UtcNow;
public Guid Identifier { get; init; } = Guid.NewGuid();
}
and
...
await messageBus.Publish(new TellMeYourAge(), (ctx) =>
{
if (ctx is AmazonSqsMessageSendContext<TellMeYourAge> amazonCtx)
{
amazonCtx.GroupId = nameof(TellMeYourAge);
amazonCtx.DeduplicationId = ctx.Message.Identifier.ToString();
}
}, cancellationToken);
...
Now it gets interesting. Amazon FIFO topics queues require you to provide a group ID when publishing messages.
I made an easy choice by using the type name for this, but you can put this to good use. SQS will ensure strict order of messages for a certain group ID.
For example, you might not care that two events for different customers are not processed in order as long as per customer the order is preserved.
Also, FIFO provides you with built-in deduplication which can be done based on content (default=false) or based on a deduplication ID that you provide.
I'll leave it to you to implement a consumer for this command.
So far we've considered replacing Amazon's services completely by throwing a localstack instance onto our machine. And we are. Basically.
But if you imagine proper surroundings, you'll very likely have testing, staging, and production environments that you'll not want to mix up. And while there is only one AWS you'll connect to, you somehow need to discriminate topics between environments or else your staging environment might end up consuming production messages and vice versa.
Luckily, our already-introduced CustomFormatter
is the right tool for this job. Just make sure that you enrich both topic and endpoint names with your current environment somehow. At least once you connect to the outside world.
Second, what happens if your applications scale horizontally in a cluster, i.e., multiple instances of the same application are consuming and publishing messages?
Luckily, SQS has got your back there - it's called the competing consumer pattern. This basically states that the first consumer to pick up a message from a single endpoint wins the game. So, not much to worry about here.
Writing good integration tests with 3rd party services present can sometimes be really tricky. On the one hand, if these tests run frequently in your CI pipeline, you would not want them to be dependent
on the availability of AWS services. Also, since we already learned that we need to distinguish SQS subscriptions per environment, multiple integration tests running at the same time could get in each other's way, causing flaky pipeline failures. As such, this can sometimes lead to environment-specific code in your production application, such as "if in pipeline, use in-memory bus". Luckily though, MassTransit has a very neat concept with its TestHarness
.
Basically, it lets your MassTransit configuration as usual and, after you're done, wraps a complete in-memory bubble about your application and reroutes all messaging calls back to itself. That is pretty handy.
We still need to adjust a few minor things.
First, we'll create a new test project using xUnit, WebApi.Consumer.Tests
and add Microsoft.AspNetCore.Mvc.Testing
as a NuGet package. We will need to use the WebApplicationFactory
from it.
Second, we'll add the following lines to WebApi.Consumer.csproj
to be able to access its entry point later on:
<ItemGroup>
<InternalsVisibleTo Include="WebApi.Consumer.Tests" />
</ItemGroup>
Third, we'll have to adjust our extension method for configuring MassTransit slightly. Remember we let MassTransit discover our consumers based on the entry assembly? Well, that won't work when we execute tests. So let's make that a bit more resilient:
public static IServiceCollection ConfigureMassTransit(this IServiceCollection services, Assembly consumerAssembly)
{
services.AddMassTransit(busConfigurator =>
{
busConfigurator.AddConsumers(consumerAssembly);
busConfigurator.UsingAmazonSqs((context, config) =>
{
config.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.AccessKey("test");
h.SecretKey("test");
h.Config(new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://localhost:4566" });
h.Config(new AmazonSQSConfig { ServiceURL = "http://localhost:4566" });
});
var formatter = new CustomFormatter(new DefaultEndpointNameFormatter(false));
config.MessageTopology.SetEntityNameFormatter(formatter);
config.ConfigureEndpoints(context, formatter);
});
});
return services;
}
Now, we can pass an assembly to this extension to tell MassTransit where our consumers live. That changes our calls in both Program.cs
to:
builder.Services.ConfigureMassTransit(typeof(Program).Assembly);
Next, we create a new class in our test project:
internal class CustomWebApplicationFactory : WebApplicationFactory<Program>
{
protected override void ConfigureWebHost(IWebHostBuilder builder)
{
base.ConfigureWebHost(builder);
builder.ConfigureServices(services => services.AddMassTransitTestHarness());
}
}
This custom WebApplicationFactory
references our consumer's Program.
cs`, lets it start up and then wraps the MassTransit test harness around it.
Any calls to this application's messaging infrastructure will now be in-memory.
We are now ready to write an actual integration test:
public class MessagingTests : IAsyncDisposable
{
private readonly CustomWebApplicationFactory applicationFactory;
public MessagingTests()
{
this.applicationFactory = new CustomWebApplicationFactory();
}
[Fact]
public async Task Receives_Heartbeat()
{
// Arrange
var testHarness = this.applicationFactory.Services.GetService<ITestHarness>()!;
var messageBus = this.applicationFactory.Services.GetService<IBus>()!;
// Act
await messageBus.Publish(new HeartBeat());
// Assert
var publishedHeartbeat = await testHarness.Published.Any<HeartBeat>();
publishedHeartbeat.Should().BeTrue();
await Task.Delay(TimeSpan.FromSeconds(0.5));
var consumedHeartBeat = await testHarness.Consumed.Any<HeartBeat>();
consumedHeartBeat.Should().BeTrue();
}
public async ValueTask DisposeAsync()
{
await this.applicationFactory.DisposeAsync();
}
}
This test configures the application, publishes a heartbeat message, gives the harness some time to perform the routing and then checks whether the messages was consumed.
I hope this gives you an idea of how to start testing an application that involves messaging with MassTransit.
MassTransit gives you much more possibilities than just sending commands. Let me give you an example:
You can think of the ConsumeContext<T>
that is provided in your consumer as the well-known HttpContext
in ASP.NET controllers. Actually, your consumers are very similar to controllers.
With this in mind, you can also perform GET requests using the message bus.
For example, let's create a command and a response to it:
public record TellMeYourAge
{
public DateTimeOffset Timestamp { get; init; } = DateTimeOffset.UtcNow;
public Guid Identifier { get; init; } = Guid.NewGuid();
}
public record AgeResponse(TimeSpan Age);
Next, we'll add another consumer to our application:
public class TellMeYourAgeConsumer : IConsumer<TellMeYourAge>
{
private readonly ILogger<TellMeYourAgeConsumer> logger;
public TellMeYourAgeConsumer(ILogger<TellMeYourAgeConsumer> logger)
{
this.logger = logger;
}
public async Task Consume(ConsumeContext<TellMeYourAge> context)
{
this.logger.LogInformation("Received age request");
var age = TimeSpan.FromMilliseconds(Environment.TickCount);
await context.RespondAsync(new AgeResponse(age));
}
}
And lastly, we need someone to actually issue that command:
public class AgeRequestHandler : IHostedService
{
private readonly ILogger<AgeRequestHandler> logger;
private readonly IBus messageBus;
public AgeRequestHandler(IBus messageBus, ILogger<AgeRequestHandler> logger)
{
this.messageBus = messageBus;
this.logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
while (cancellationToken.IsCancellationRequested == false)
try
{
var response = await messageBus.Request<TellMeYourAge, AgeResponse>(new TellMeYourAge(),
cancellationToken, default, ctx =>
{
if (ctx is AmazonSqsMessageSendContext<TellMeYourAge> amazonCtx)
{
amazonCtx.GroupId = nameof(TellMeYourAge);
amazonCtx.DeduplicationId = ctx.Message.Identifier.ToString();
}
});
logger.LogInformation("Received age response {Timestamp}", response.Message.Age);
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
}
catch (TaskCanceledException)
{
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
Note that in the real world, there is actually more to be dealt with here. The request should time out since you cannot guarantee that anyone is listening.
Consider all of this code a starting point, for more in-depth scenarios than the usual Hello World
.
After registering this class as another hosted service and starting both producer and consumer, you should be able to see the following log messages:
info: WebApi.Producer.AgeRequestHandler[0]
Sending age request
info: WebApi.Producer.HeartBeatSender[0]
Sent heartbeat 12/18/2022 12:03:14 +00:00
info: WebApi.Producer.HeartBeatSender[0]
Sent heartbeat 12/18/2022 12:03:19 +00:00
info: WebApi.Producer.HeartBeatSender[0]
Sent heartbeat 12/18/2022 12:03:24 +00:00
info: WebApi.Producer.AgeRequestHandler[0]
Received age response 3.20:08:27.0780000
That's a nice example - but should you (mis)use messaging like this?
Let's remember where we started. We wanted to get away from chained API calls that made our code somehow synchronously wait for results.
The goal was to implement a more asynchronous workflow. Clearly, bending MassTransit to work as an RPC medium is not the right way to go.
MassTransit already has a lot of options in place if you do want to follow a more conversational type of workflow.
In that case, you might want to have a look at Sagas and Activities which are both concepts related to this.
Also, although this post intended to go a bit further beyond typical starter examples, we still covered only the happy parts of this adventure.
If we know anything about our day-to-day job, it's that things will go wrong.
In the messaging world, that means that you can never be sure that your event or command was actually transmitted.
"Not a problem", you might say, "I can send it again" - but what if it has been sent and you just don't know about it, and somewhere else an amount of money has been transferred from a customer's account?
Things are about to get ugly if you care not to think about the reliability of your messaging solution.
Of course, clever people already invented patterns to cope with these situations: Inbox and Outbox.
I will try to cover these in a future article.
Many tutorials out there (like this one did so far) will only tell you about the happy path of integrating MassTransit. But let's face it: Things will go wrong. That is as likely as the dreaded HTTP500 error that we see in APIs.
By default, whenever one of your consumers faces an exception that it cannot handle, the message, since its processing was not successful, will be moved to an error queue. MassTransit gives you a few options on how to deal with messages in your error queues:
The docs are rather helpful on this, so let's only have a brief look at the last option since it is the one giving you the biggest level of control. To consume faulted messages, you can easily implement the interface we already know:
public class ThrowAnExceptionConsumer : IConsumer<ThrowAnException>
{
public Task Consume(ConsumeContext<ThrowAnException> context)
{
throw new Exception("Something went wrong.");
}
}
public class ThrowAnExceptionFaultConsumer : IConsumer<Fault<ThrowAnException>>
{
private readonly ILogger<ThrowAnExceptionFaultConsumer> logger;
public ThrowAnExceptionFaultConsumer(ILogger<ThrowAnExceptionFaultConsumer> logger)
{
this.logger = logger;
}
public Task Consume(ConsumeContext<Fault<ThrowAnException>> context)
{
this.logger.LogInformation("Consuming the faulted message");
return Task.CompletedTask;
}
}
In the snippet above, you can see both the consumer for the messages that represent our example of a failed message handling - it just throws an exception.
Also, you can see the implementation that wraps the same message inside a generic Fault<T>
which lets us consume messages from the error queue.
With this simple pattern, you can implement your own error-handling pipeline, if you don't want to default to the built-in retry mechanisms.
I'll leave you to it to code this example yourself.
Let's make a cut for now. I hope that you could take away one or two things from this walk-through. Messaging is a vast topic in modern software solutions and it's worth playing around with it from time to time. While it can help your applications scale in grow, it certainly also introduces a level of complexity to your system, that not everyone will be accustomed to.
By the way, you can find the code related to this article on GitHub.
Be the first to know when a new post was released
We use cookies on our website to give you the most relevant experience by remembering your preferences and repeat visits. By clicking “Accept All”, you consent to the use of ALL the cookies. However, you may visit "Cookie Settings" to provide a controlled consent.