RabbitMQ Transport

The RabbitMQ transport connects Mocha to a RabbitMQ broker for production messaging. It manages connections, provisions exchanges and queues automatically, handles message acknowledgement, and supports request/reply with dedicated reply endpoints. When you need durable, distributed messaging across multiple services, this is the transport to use.

Set up the RabbitMQ transport#

By the end of this section, you will have a Mocha bus connected to RabbitMQ with automatic topology provisioning.

Install the package#

Bash
dotnet add package Mocha.Transport.RabbitMQ

Register with .NET Aspire#

The most common setup uses the Aspire RabbitMQ component for connection management:

Bash
dotnet add package Aspire.RabbitMQ.Client
C#
using Mocha;
using Mocha.Transport.RabbitMQ;

var builder = WebApplication.CreateBuilder(args);

// Aspire registers IConnectionFactory from the "rabbitmq" connection resource
builder.AddRabbitMQClient("rabbitmq");

// Register the message bus with RabbitMQ transport
builder.Services
    .AddMessageBus()
    .AddEventHandler<OrderPlacedEventHandler>()
    .AddRabbitMQ();

var app = builder.Build();
app.Run();

The Aspire component reads the connection string from configuration (typically ConnectionStrings:rabbitmq), handles health checks, and integrates with the Aspire dashboard for observability.

.AddRabbitMQ() picks up the IConnectionFactory from DI (registered by Aspire) and uses it to establish connections to the broker. Default conventions automatically create exchanges, queues, and bindings for your registered handlers.

Register with a manual connection string#

If you are not using Aspire, register the IConnectionFactory directly:

C#
using Mocha;
using Mocha.Transport.RabbitMQ;
using RabbitMQ.Client;

var builder = WebApplication.CreateBuilder(args);

// Register IConnectionFactory manually
builder.Services.AddSingleton<IConnectionFactory>(_ =>
    new ConnectionFactory
    {
        HostName = "localhost",
        Port = 5672,
        VirtualHost = "/",
        UserName = "guest",
        Password = "guest"
    });

builder.Services
    .AddMessageBus()
    .AddEventHandler<OrderPlacedEventHandler>()
    .AddRabbitMQ();

var app = builder.Build();
app.Run();

To use a connection string from configuration:

C#
builder.Services.AddSingleton<IConnectionFactory>(_ =>
{
    var factory = new ConnectionFactory();
    factory.Uri = new Uri(builder.Configuration.GetConnectionString("rabbitmq")!);
    return factory;
});

Use a custom connection provider#

For full control over connection lifecycle, provide a custom IRabbitMQConnectionProvider:

C#
builder.Services
    .AddMessageBus()
    .AddRabbitMQ(transport =>
    {
        transport.ConnectionProvider(sp =>
        {
            return sp.GetRequiredService<MyCustomConnectionProvider>();
        });
    });

The IRabbitMQConnectionProvider interface exposes Host, Port, VirtualHost, and a CreateAsync method. When no custom provider is registered, the transport falls back to resolving IConnectionFactory from DI and wrapping it in a default provider.

Verify it works#

Add an endpoint that publishes through the bus and verify the handler executes:

C#
app.MapPost("/orders", async (IMessageBus bus) =>
{
    await bus.PublishAsync(new OrderPlacedEvent
    {
        OrderId = Guid.NewGuid(),
        CustomerId = "customer-1",
        TotalAmount = 99.99m
    }, CancellationToken.None);

    return Results.Ok();
});

Send a POST request to /orders and check your application logs. You should see the handler process the event. You can also inspect the RabbitMQ management UI at http://localhost:15672 to see the auto-provisioned exchanges and queues.

Two connections per broker transport#

Mocha opens two connections to the broker: one for consuming and one for dispatching.

This design prevents back-pressure from slow consumers from blocking outbound message publishing. When a consumer processes messages slowly, the RabbitMQ client applies back-pressure on that connection. Without separation, a slow consumer could prevent your application from publishing new messages entirely. With separate connections, each direction operates independently.

How topology works#

When the transport starts, it provisions topology on the broker automatically. Here is how message types map to RabbitMQ resources:

publish

binding

consume

Publisher

Exchange
order-placed-event

Queue
billing-service

Consumer

Events (publish/subscribe): Each event type gets a fanout exchange. Each service that subscribes creates a queue bound to that exchange. Publishing sends the message to the exchange, which fans it out to all bound queues.

Commands (send): Each command type gets a direct exchange bound to a single queue. Sending delivers the message to exactly one consumer.

Request/reply: The transport creates a temporary reply queue per service instance. The reply address is embedded in the request message so the responder knows where to send the reply.

Warning

Messages published before the transport completes its Start phase may be lost if no queue is bound to the exchange yet. During deployment, ensure consuming services start before publishing services, or use publisher confirms to detect lost messages.

If a message is published to an exchange with no bound queue - for example, when no consumer has started - that message is dropped. Mocha auto-provisions topology, but the window between exchange creation and queue binding is a real operational risk.

Publisher confirms#

Mocha's RabbitMQ transport uses publisher confirms on dispatch, which means the broker acknowledges each published message before the publish call completes. This provides at-least-once delivery guarantees for outbound messages: if the broker does not confirm, the publish fails with an exception. See the RabbitMQ Reliability Guide for a full treatment of delivery guarantees.

Default topology for event handlers#

When you register an event handler with AddEventHandler<T>(), the RabbitMQ transport creates this topology:

A fanout exchange named after the message type fans out to per-service exchanges, which bind to per-service queues. This allows multiple services to each receive a copy of every published event.

Default topology for send handlers#

When you register a request handler with AddRequestHandler<T>() for send (fire-and-forget), the transport creates a single queue:

Send messages go to a dedicated queue. Only one handler processes each message - this is the point-to-point guarantee.

Configure transport-level defaults#

You can set defaults that apply to all auto-provisioned queues and exchanges. This is useful when you want consistent settings across all resources without configuring each one individually.

Use ConfigureDefaults to set queue and exchange defaults:

C#
builder.Services
    .AddMessageBus()
    .AddRabbitMQ(transport =>
    {
        transport.ConfigureDefaults(defaults =>
        {
            // All queues will be quorum with a delivery limit of 5
            defaults.Queue.QueueType = RabbitMQQueueType.Quorum;
            defaults.Queue.Arguments["x-delivery-limit"] = 5;

            // All exchanges will use topic routing
            defaults.Exchange.Type = RabbitMQExchangeType.Topic;
        });
    });

For example, to enable quorum queues with a specific initial group size:

C#
builder.Services
    .AddMessageBus()
    .AddRabbitMQ(transport =>
    {
        transport.ConfigureDefaults(defaults =>
        {
            defaults.Queue.QueueType = RabbitMQQueueType.Quorum;
            defaults.Queue.Arguments["x-quorum-initial-group-size"] = 3;
        });
    });

Or to use stream queues for append-only log semantics:

C#
builder.Services
    .AddMessageBus()
    .AddRabbitMQ(transport =>
    {
        transport.ConfigureDefaults(defaults =>
        {
            defaults.Queue.QueueType = RabbitMQQueueType.Stream;
        });
    });

Available queue defaults:

PropertyTypeDescription
QueueTypestringQueue type: RabbitMQQueueType.Classic, .Quorum, or .Stream
Durablebool?Whether queues survive broker restarts (default: true)
AutoDeletebool?Whether queues are auto-deleted when unused (default: false)
ArgumentsDictionary<string, object>Additional arguments (e.g., x-delivery-limit, x-max-priority)

Available exchange defaults:

PropertyTypeDescription
TypestringExchange type: RabbitMQExchangeType.Fanout, .Direct, .Topic, or .Headers
Durablebool?Whether exchanges survive broker restarts (default: true)
AutoDeletebool?Whether exchanges are auto-deleted when unused (default: false)
ArgumentsDictionary<string, object>Additional arguments (e.g., alternate-exchange)

Defaults never override explicitly configured values. If you declare a queue with a specific queue type, that setting takes precedence over the transport default. You can call ConfigureDefaults multiple times - each call accumulates settings on the same defaults object.

Configure queues#

Use transport.Queue("name") when you need to customize a RabbitMQ queue. The queue builder is the primary API for queue names, queue type, arguments, handler assignment, source bindings, prefetch, concurrency, fault queues, and skipped queues.

C#
builder.Services
    .AddMessageBus()
    .AddEventHandler<OrderPlacedEventHandler>()
    .AddRabbitMQ(transport =>
    {
        transport.BindExplicitly();

        transport.Queue("orders.processing")
            .BindImplicitly()
            .Quorum()
            .MaxPrefetch(50)
            .MaxConcurrency(10)
            .FaultEndpoint("order-errors")
            .Handler<OrderPlacedEventHandler>();
    });

BindExplicitly() at the transport scope means only queues you configure are used for receiving. BindImplicitly() on the queue tells Mocha to generate the convention-derived exchange bindings for the messages handled by that queue.

Calling Queue("name") without Handler<T>(), Consumer<T>(), or Receives<T>() declares only the RabbitMQ queue. Add a handler, consumer, or received message type when the queue should also consume messages.

C#
transport.Queue("audit-log")
    .Quorum()
    .AutoProvision(false);

For custom source topology, bind the queue from a specific exchange:

C#
transport.Queue("eu-orders")
    .BindExplicitly()
    .BindFrom(new Uri("exchange:region-events"), "eu.*")
    .Consumer<EuRegionConsumer>();

BindExplicitly() on the queue suppresses convention-derived exchange bindings for that queue, so only the BindFrom(...) sources are used.

Declare topology resources#

Mocha auto-provisions topology from registered handlers and queue builders by default.

Caution

Use DeclareExchange(), DeclareQueue(), and DeclareBinding() only when you need infrastructure topology that is not represented by a queue builder, or when you are aligning with topology managed outside of Mocha. For handler queues, prefer transport.Queue("name").

To declare infrastructure-only topology:

C#
builder.Services
    .AddMessageBus()
    .AddRabbitMQ(transport =>
    {
        transport.DeclareExchange("order-events")
            .Type(RabbitMQExchangeType.Fanout)
            .Durable()
            .AutoProvision();

        transport.DeclareQueue("billing-orders")
            .Durable()
            .AutoProvision()
            .WithArgument("x-queue-type", "quorum");

        transport.DeclareBinding("order-events", "billing-orders")
            .AutoProvision();
    });

All declared topology is provisioned when the transport starts, before receive endpoints begin consuming.

Control auto-provisioning#

By default, the transport auto-provisions all topology resources (exchanges, queues, bindings) on the broker at startup. In production environments where infrastructure is managed externally - for example by Terraform, Ansible, or the RabbitMQ Messaging Topology Operator on Kubernetes - you can disable auto-provisioning so the transport expects resources to already exist.

The examples in this section use DeclareExchange(), DeclareQueue(), and DeclareBinding() because they are about broker topology management. For application handler queues, use Queue("name") instead.

Disable globally#

Turn off auto-provisioning for the entire transport:

C#
builder.Services
    .AddMessageBus()
    .AddEventHandler<OrderPlacedEventHandler>()
    .AddRabbitMQ(transport =>
    {
        transport.AutoProvision(false);
    });

With auto-provisioning disabled, the transport will not create any exchanges, queues, or bindings. All resources must already exist on the broker before the transport starts.

Override per resource#

Individual resources can override the transport-level setting. This is useful when most topology is managed externally but a few resources need to be created dynamically:

C#
builder.Services
    .AddMessageBus()
    .AddRabbitMQ(transport =>
    {
        // Disable globally
        transport.AutoProvision(false);

        // This exchange already exists on the broker - skip provisioning
        transport.DeclareExchange("order-events");

        // This queue should be created by the transport
        transport.DeclareQueue("billing-orders")
            .AutoProvision(true);

        // This binding should also be created
        transport.DeclareBinding("order-events", "billing-orders")
            .AutoProvision(true);
    });

The effective auto-provision value for each resource follows a cascading pattern:

Resource settingTransport settingResult
trueanyProvisioned
falseanyNot provisioned
not settrue (default)Provisioned
not setfalseNot provisioned

When a resource does not specify AutoProvision, it inherits the transport-level default. When the transport does not specify AutoProvision, it defaults to true.

Common patterns#

Fully managed infrastructure: Disable auto-provisioning globally and declare all resources without AutoProvision. The transport will use existing broker resources without attempting to create them.

C#
transport.AutoProvision(false);
transport.DeclareExchange("order-events");
transport.DeclareQueue("billing-orders");
transport.DeclareBinding("order-events", "billing-orders");

Selective provisioning: Disable globally but enable for specific resources that are owned by this service.

C#
transport.AutoProvision(false);
transport.DeclareExchange("shared-events");              // managed externally
transport.DeclareQueue("my-service-queue")
    .AutoProvision(true);                                // owned by this service
transport.DeclareBinding("shared-events", "my-service-queue")
    .AutoProvision(true);                                // owned by this service

Kubernetes with the Messaging Topology Operator: When the RabbitMQ Messaging Topology Operator manages your exchanges, queues, and bindings as Kubernetes custom resources, disable auto-provisioning entirely. The operator declares topology through CRDs, and the transport simply uses the existing resources:

YAML
# Kubernetes CRD - managed by the Messaging Topology Operator
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
  name: billing-orders
spec:
  name: billing-orders
  durable: true
  rabbitmqClusterReference:
    name: my-cluster
C#
// Application code - topology already exists on the broker
transport.AutoProvision(false);
transport.DeclareExchange("order-events");
transport.DeclareQueue("billing-orders");
transport.DeclareBinding("order-events", "billing-orders");

Opt-out individual resources: Keep auto-provisioning enabled but skip specific resources that are managed elsewhere.

C#
transport.DeclareExchange("platform-events")
    .AutoProvision(false);                               // managed by platform team
transport.DeclareQueue("my-queue");                      // auto-provisioned (default)
transport.DeclareBinding("platform-events", "my-queue"); // auto-provisioned (default)

Configure convention endpoints#

Use transport.Handler<T>() at the end of the transport configuration when you want to keep the convention-derived queue name and only tune one handler endpoint:

C#
builder.Services
    .AddMessageBus()
    .AddEventHandler<OrderPlacedEventHandler>()
    .AddRabbitMQ(transport =>
    {
        transport.Handler<OrderPlacedEventHandler>()
            .ConfigureEndpoint(e => e.MaxPrefetch(50).MaxConcurrency(10));
    });

This keeps the convention-derived endpoint name while tuning the consumer settings. ConfigureEndpoint() can be called multiple times - actions compose in declaration order:

C#
transport.Handler<OrderPlacedEventHandler>()
    .ConfigureEndpoint(e => e.MaxPrefetch(50))
    .ConfigureEndpoint(e => e.MaxConcurrency(10))
    .ConfigureEndpoint(e => e.FaultEndpoint("order-errors"));

For full control over the queue name, queue type, source bindings, and handler assignment, use Queue("name") instead.

MaxPrefetch controls how many unacknowledged messages RabbitMQ delivers to the consumer at once. Default: 100. Lower values reduce memory pressure under high load. Higher values improve throughput for fast handlers.

MaxConcurrency controls how many messages the endpoint processes in parallel. Set this based on your handler's throughput characteristics.

A good starting point: set MaxPrefetch equal to or slightly higher than MaxConcurrency. For slow handlers (long database operations, external API calls), lower MaxPrefetch to 10 to 20 to prevent messages from piling up in the consumer's unacknowledged buffer. For quorum queues specifically, avoid setting MaxPrefetch to 1 - a prefetch of 1 starves consumers while acknowledgements flow through the consensus mechanism and significantly reduces throughput.

For prefetch tuning guidance from first principles, see CloudAMQP Best Practices.

Example with the preferred queue builder:

C#
builder.Services
    .AddMessageBus()
    .AddEventHandler<OrderPlacedEventHandler>()
    .AddRabbitMQ(transport =>
    {
        transport.BindExplicitly();

        transport.Queue("orders.processing")
            .BindImplicitly()
            .MaxPrefetch(50)
            .MaxConcurrency(10)
            .Handler<OrderPlacedEventHandler>();
    });

Auto-provisioned resource naming#

ResourceNaming conventionCreated when
Exchange (event)Message type name (e.g., OrderPlacedEvent)First publish or subscribe
Exchange (command)Message type name (e.g., ReserveInventoryCommand)First send or handler registration
QueueEndpoint name derived from handler registrationHandler is bound to the transport
Reply queueInstance-specific nameTransport starts
BindingsExchange-to-queueEndpoint discovery phase

All auto-provisioned resources are durable by default and survive broker restarts.

Routing keys#

RabbitMQ uses a routing_key field on every published message to decide which queues receive it. When you publish to a topic exchange, the broker compares the message's routing key against binding patterns on each queue. Queues whose pattern matches get the message. Queues that don't match never see it.

Direct exchanges work the same way, but require an exact match instead of a pattern.

Fanout exchanges ignore routing keys entirely - every bound queue gets every message.

Routing keys are useful when you need to split a single message stream across different consumers based on a property of the message itself:

  • Disconnecting producers from consumers - publish messages without knowing which queues or services will consume them. Consumers can bind with patterns to receive only the messages they care about.
  • Multi-tenant routing - route messages to tenant-specific queues (tenant-a.orders, tenant-b.orders)
  • Region-based routing - route to regional processors (us.east, eu.west)
  • Priority routing - separate high-priority and low-priority messages (priority.high, priority.low)

For a full treatment of topic exchange routing, see the RabbitMQ Topics Tutorial.

Configure routing key extraction#

To set a routing key on published messages, call UseRabbitMQRoutingKey<T>() when registering the message type:

C#
builder.Services
    .AddMessageBus()
    .AddMessage<OrderEvent>(m => m
        .UseRabbitMQRoutingKey<OrderEvent>(msg => msg.Region))
    .AddRabbitMQ();

The extractor function runs at dispatch time for each message. It receives the message instance and returns the routing key string. Return null to publish without a routing key.

UseRabbitMQRoutingKey<T>() is configured on AddMessage<T>(), not on the transport or endpoint. This keeps routing key logic next to the message definition where it belongs.

Composite routing keys#

Combine multiple properties into a single routing key using string interpolation:

C#
builder.Services
    .AddMessageBus()
    .AddMessage<OrderEvent>(m => m
        .UseRabbitMQRoutingKey<OrderEvent>(msg => $"{msg.TenantId}.{msg.Region}"))
    .AddRabbitMQ();

This produces routing keys like acme.us.east or contoso.eu.west, which you can match with topic exchange binding patterns like acme.# or *.eu.*.

Topic exchange example#

This example routes region-tagged events to different queues based on their routing key. The US queue receives messages matching us.*, and the EU queue receives messages matching eu.*.

region = us.east

us.* ✓

eu.* ✗

Publisher

Topic Exchange
region-events

Queue
us-orders

Queue
eu-orders

US Consumer

Define the message type#

C#
public sealed class RegionEvent
{
    public required string Region { get; init; }
    public required string Payload { get; init; }
}

Wire up the bus#

C#
builder.Services
    .AddMessageBus()
    .AddConsumer<UsRegionConsumer>()
    .AddConsumer<EuRegionConsumer>()
    .AddMessage<RegionEvent>(m => m
        .UseRabbitMQRoutingKey<RegionEvent>(msg => msg.Region))
    .AddRabbitMQ(transport =>
    {
        transport.BindExplicitly();

        // Declare the exchange so the transport provisions it as a topic exchange
        transport.DeclareExchange("region-events")
            .Type(RabbitMQExchangeType.Topic);

        transport.Queue("us-orders")
            .BindExplicitly()
            .BindFrom(new Uri("exchange:region-events"), "us.*")
            .Consumer<UsRegionConsumer>();
        transport.Queue("eu-orders")
            .BindExplicitly()
            .BindFrom(new Uri("exchange:region-events"), "eu.*")
            .Consumer<EuRegionConsumer>();

        // Dispatch to the topic exchange
        transport.DispatchEndpoint("region-dispatch")
            .ToExchange("region-events")
            .Publish<RegionEvent>();
    });

When you publish a RegionEvent with Region = "us.east", the routing key middleware extracts "us.east" from the message and sets it on the AMQP publish. The topic exchange matches "us.east" against us.* (match) and eu.* (no match). Only the US queue receives the message.

Topic exchange binding patterns#

PatternMatchesDoes not match
us.*us.east, us.westus.east.az1, eu.west
eu.#eu.west, eu.west.az1us.east
#Everything-
*.*.az1us.east.az1, eu.west.az1us.east

* matches exactly one word. # matches zero or more words. Words are separated by dots.

Direct exchange routing keys#

Direct exchanges use exact-match routing keys instead of patterns. A message with routing key "priority-high" reaches only queues bound with exactly "priority-high".

C#
builder.Services
    .AddMessageBus()
    .AddConsumer<HighPriorityConsumer>()
    .AddMessage<TaskEvent>(m => m
        .UseRabbitMQRoutingKey<TaskEvent>(msg => $"priority-{msg.Priority}"))
    .AddRabbitMQ(transport =>
    {
        transport.BindExplicitly();

        transport.DeclareExchange("task-routing")
            .Type(RabbitMQExchangeType.Direct);

        transport.Queue("high-priority-tasks")
            .BindExplicitly()
            .BindFrom(new Uri("exchange:task-routing"), "priority-high")
            .Consumer<HighPriorityConsumer>();

        transport.DispatchEndpoint("task-dispatch")
            .ToExchange("task-routing")
            .Publish<TaskEvent>();
    });

Messages with Priority = "high" reach the queue. Messages with any other priority are dropped by the exchange (unless another queue is bound with a matching routing key).

Next steps#

Runnable example: RabbitMQ

Full demo: All three Demo services use RabbitMQ in production mode with .NET Aspire. See Demo.AppHost for the Aspire orchestration and Demo.Catalog for a complete service using .AddRabbitMQ() with outbox, inbox, sagas, and multiple handler types.

Edit this page on GitHub
Last updated on by Tobias Tengler