I've been evaluating different .Net messaging components and I'm having some trouble with Rebus using MSMQ that I haven't been able to resolve with docs, samples, or google.
I believe the only thing unique about my scenario is that my client can Send or Publish messages. Seems like it should be simple, but I'm having the following issues. Code snippets follow the issues.
Issues:
- MetersRequest not being handled. They are just piling up in outgoingMetersRequests queue.
- Getting an error saying that creditAuthRequests queue doesn't exist or I don't have permissions. I've confirmed the queue is not being created, but my assumption is that Rebus will ensure it's created just as the outgoingMetersRequests queue is being created. (Side note: The error goes away if I remove the 'await' from Bus.Send(...), but the queue is still not created and the messages are no where to be found.)
===========
FROM CLIENT
private void InitializeBus()
{
_messageActivator = new BuiltinHandlerActivator();
Configure.With(_messageActivator)
.Transport(t => t.UseMsmq("publisher"))
.Routing(r => r.TypeBased().Map<MetersRequest>("outgoingMetersRequests")
.Map<CreditAuthorizationRequest>("creditAuthRequests"))
.Start();
}
private async Task SendCreditAuthRequestAsync(int numberToSend)
{
var cardNumber = generateCardNumber();
await _messageActivator.Bus.Send(new CreditAuthorizationRequest(cardNumber));
await WriteOutputAsync($"Sent credit auth request for card {cardNumber}.");
}
private async Task SendMetersRequestAsync(int numberToSend)
{
await _messageActivator.Bus.Send(new MetersRequest());
await WriteOutputAsync("Sent meters request.");
}
=========== END CLIENT
============
FROM SERVICE
private void InitializeBus()
{
_messageActivator = new BuiltinHandlerActivator();
_messageActivator.Register<PosOnlineHandler>(() => new PosOnlineHandler(WriteOutputAsync));
_messageActivator.Register<PumpDownHandler>(() => new PumpDownHandler(WriteOutputAsync));
_messageActivator.Register<MetersRequestHandler>(() => new MetersRequestHandler(WriteOutputAsync, _messageActivator.Bus));
_messageActivator.Register<CreditAuthorizationHandler>(() => new CreditAuthorizationHandler(WriteOutputAsync, _messageActivator.Bus));
Configure.With(_messageActivator)
.Transport(t => t.UseMsmq("subscriber1"))
.Routing(r => r.TypeBased()
.Map<PumpDownEvent>("publisher")
.Map<PosOnlineEvent>("publisher")
.Map<MetersRequest>("outgoingMetersRequests")
.Map<CreditAuthorizationRequest>("creditAuthRequests"))
.Start();
_messageActivator.Bus.Subscribe<PumpDownEvent>().Wait();
_messageActivator.Bus.Subscribe<PosOnlineEvent>().Wait();
}
public class MetersRequestHandler : IHandleMessages<MetersRequest>
{
private readonly Random _randomizer = new Random();
private readonly Func<string, Task> _outputDelegate2;
private readonly IBus _messageBus;
public MetersRequestHandler(Func<string, Task> outputDelegate, IBus messageBus)
{
_outputDelegate2 = outputDelegate;
_messageBus = messageBus;
}
public async Task Handle(MetersRequest message)
{
var pump = _randomizer.Next(20);
var meters = _randomizer.Next();
decimal dollars = (decimal)_randomizer.NextDouble();
var response = new MetersResponse(pump, meters, dollars);
await _outputDelegate2($"Sending MetersResponse: (Pump={pump}) (Meters={meters}) (Dollars ={dollars}");
await _messageBus.Reply(response);
}
}
============
There's a couple of things that seem a little off in the code you posted. I'll just comment on your issues, and then I will suggest a way forward :)
When you call
.Routing(t => t.TypeBased().Map<SomeMessage>("someQueue"))
you are saying that theSomeMessage
type is "owned" by the Rebus endpoint with the input queuesomeQueue
.When you
await bus.Send(yourMessage)
, Rebus will get the queue that owns the type ofyourMessage
and send the message there. This explains pretty well why yourMetersRequest
s go to theoutgoingMetersRequests
queue.However, you haven't posted any code that shows an endpoint that has the
outgoingMetersRequests
queue as its input queue. Someone will need to be processing messages off of that queue in order for something to happen.The "input queue" is what you configure in the
.Transport(t => t.UseMsmq("publisher"))
part – in this case, the input queue ispublisher
.Yeah – MSMQ error messages usually aren't the best ;)
Rebus (with MSMQ) ONLY creates input queues and error queues (which by default is called
error
). This means that you must have had an endpoint running usingoutgoingMetersRequests
as its input queue at least once, or maybe you created the queue manually?The error does not go away – the exception is just caught and passed to a continuation that runs on a threadpool thread which you never get to see because you let go of the
Task
returned fromSend
.I suggest you do this:
Come up with good names for input queues –
publisher
andsubscriber
are too generic, you should pick names that correspond to each endpoint's responsibility or reason for existing in the first place.Be conscious about your message types. It seems you got this right as you have named the events
*Event
, and your usage of request/reply looks right too.Use some kind of shared subscription storage, e.g. a central SQL Server. This way – if you configure the same centralized subscription storage in all of your publishers and subscribers – you do not need any endpoint mappings for your event types, which is pretty neat. You can read more on the wiki page about subscription storages in the section about the centralized type.