I’m working on a .Net 8 API where we use many custom (self) Nuget libraries, like for example, one that wraps some RabbitMQ functionality, and to add more info, the API also uses CQRS (Mediator) pattern to handle RabbitMQ events.
We declare all the “Publisher” and “Listener” events in Program.cs in the following way (I’ll add only the relevant parts):
Program.cs
await RunServer.RunAsync<Startup>(
args,
(builder, options) =>
{
options.CQRS.Enabled = true;
options.CQRS.Assemblies = [typeof(xxx.Domain.AssemblyInfo).Assembly];
options.Event.Enabled = true;
options.Event.ConnString = builder.Configuration.GetConnectionString("RabbitMQ")!;
options.Event.PublishQueues =
RabbitMQUtils.CreatePublishQueue("DataSync_Exchange",
[
(QueueIdentifier.JobManager_JobRequest, "Job_Request")
]
)
)
.ToArray();
options.Event.ListenQueues =
RabbitMQUtils.CreateListenerQueue("DataSync_Exchange",
QueueIdentifier.JobManager_JobRequest,
[
"Job_Request"
]
);
options.OtherServices = () =>
{
builder.Services.AddHostedService<HypervisorTrackingBackgroundService>();
};
}
);
(I’m using only “JobRequestV0Event” here for the sake of simplicity.)
CreatePublishQueue
public static RabbitMQQueueAlias[] CreatePublishQueue(string exchange, (QueueIdentifier Alias, string RoutingKey)[] queues)
{
string exchange2 = exchange;
return queues.Select(((QueueIdentifier Alias, string RoutingKey) q) => new RabbitMQQueueAlias
{
Alias = q.Alias,
RoutingKey = new string[1] { q.RoutingKey },
Exchange = exchange2,
Active = true,
Durable = true
}).ToArray();
}
CreateListenerQueue
public static RabbitMQQueueAlias[] CreateListenerQueue(string exchange, QueueIdentifier alias, string[] routingKey, string? postfix = null)
{
string text = alias.ToString();
string text2 = text.Substring(2, text.Length - 2);
if (postfix != null)
{
text2 += postfix;
}
return new RabbitMQQueueAlias[1]
{
new RabbitMQQueueAlias
{
RoutingKey = routingKey,
Alias = alias,
Exchange = exchange,
Queue = text2,
Active = true,
Durable = true
}
};
}
In the “HypervisorTrackingBackgroundService” that can be seen in Program.cs the publish is launched like follows:
IJobUtility.cs
public interface IJobUtility
{
Task PublishRequestJob(string name, string? notes = null, string @params = "");
}
public sealed class JobUtility : IJobUtility
{
private readonly IEventBus _eventBus;
private readonly IVdcIdGenerator _gen;
public JobUtility(IEventBus eventBus, IVdcIdGenerator gen)
{
_eventBus = eventBus;
_gen = gen;
}
public async Task PublishRequestJob(string name, string? notes = null, string @params = "")
{
var id = _gen.GenerateId();
await _eventBus.PublishAsync(new JobRequestV0Event(
id,
new Guid(1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0),
_gen.WorkerId,
_gen.GenerateIdAsString(),
new JobRequestV0Event.B(name, @params, notes)));
}
}
Where “PublishAsync” is the wrapper for “SendAsync” (located in EventBus)
SendAsync
protected virtual Task SendAsync(object graph, Guid id, string eventName, string? correlationId, bool useEnvelop)
{
string eventName2 = eventName;
object graph2 = graph;
_channel.ConfirmSelect();
IBasicProperties basicProperties = _channel.CreateBasicProperties();
basicProperties.Persistent = true;
basicProperties.ContentType = "application/json";
basicProperties.MessageId = id.ToString();
IBasicProperties basicProperties2 = basicProperties;
if (basicProperties2.Headers == null)
{
IDictionary<string, object> dictionary2 = (basicProperties2.Headers = new Dictionary<string, object>());
}
basicProperties.Headers["X-Correlation-ID"] = correlationId;
basicProperties.Headers["X-Event"] = eventName2;
basicProperties.Headers["X-Has-Envelop"] = useEnvelop;
_setup?.Invoke(graph2, basicProperties);
IEnumerable<RabbitMQQueueAlias<TAlias>> enumerable = _queue;
if (_filter != null)
{
enumerable = enumerable.Where((RabbitMQQueueAlias<TAlias> queue) => _filter(queue.Alias, GenEventName(queue, eventName2, graph2), graph2));
}
foreach (RabbitMQQueueAlias<TAlias> item in enumerable)
{
object obj = _transform?.Invoke(item.Alias, eventName2, graph2) ?? graph2;
string[] routingKey = item.RoutingKey;
foreach (string text in routingKey)
{
if (useEnvelop)
{
obj = new MessageEnvelop(obj, _resolver.Resolver(obj.GetType()) ?? text);
}
byte[] bytes = Encoding.UTF8.GetBytes(_serializer.Serialize(obj));
_channel.BasicPublish(item.Exchange, text, mandatory: true, basicProperties, bytes);
_channel.WaitForConfirms();
_logger?.LogInformation("Publish Event to {Exchange}, {RoutingKey}, {@Event}", item.Exchange, item.RoutingKey, obj);
}
}
return Task.CompletedTask;
}
The background service is executed with no problems and so the “Publish” (at least it does not throw any exception), but the CQRS handler that should listen to that publish never gets fired (the breakpoint is never hit).
JobRequestEventHandler
internal class JobRequestEventHandler : IVdcEventHandler<JobRequestV0Event>
{
private readonly IServiceProvider _provider;
public JobRequestEventHandler(IServiceProvider provider)
{
_provider = provider;
}
public async ValueTask<IResponse> HandleAsync(JobRequestV0Event request, CancellationToken ct = default)
{
Console.WriteLine("JobRequest is started being consumed...");
using var scope = _provider.CreateScope();
var jobUtility = scope.ServiceProvider.GetService<IJobUtility>();
...
}
}
After “PublishAsync” is run I check the RabbitMQ management console and I see some data going through “Message rates”, but no data at all appears in the “Queued messages” graphs, and under “Details” total is always 0.
I cannot say if the problem is in the publisher (it might be, because in the Queued messages I never see anything and total is always 0) or if the problem could be the CQRS handler, but the fact is I set a breakpoint in the listener (handler) and it is never hit, not the Console.WriteLine does anything, and no exception is thrown.
Any help?