Rebus Newbie Questions

1.4k views Asked by At

I've been evaluating different .Net messaging components and I'm having some trouble with Rebus using MSMQ that I haven't been able to resolve with docs, samples, or google.

I believe the only thing unique about my scenario is that my client can Send or Publish messages. Seems like it should be simple, but I'm having the following issues. Code snippets follow the issues.

Issues:

  1. MetersRequest not being handled. They are just piling up in outgoingMetersRequests queue.
  2. Getting an error saying that creditAuthRequests queue doesn't exist or I don't have permissions. I've confirmed the queue is not being created, but my assumption is that Rebus will ensure it's created just as the outgoingMetersRequests queue is being created. (Side note: The error goes away if I remove the 'await' from Bus.Send(...), but the queue is still not created and the messages are no where to be found.)

===========

FROM CLIENT

    private void InitializeBus()
    {
         _messageActivator = new BuiltinHandlerActivator();
         Configure.With(_messageActivator)
            .Transport(t => t.UseMsmq("publisher"))
            .Routing(r => r.TypeBased().Map<MetersRequest>("outgoingMetersRequests")
            .Map<CreditAuthorizationRequest>("creditAuthRequests"))
            .Start();
    }

    private async Task SendCreditAuthRequestAsync(int numberToSend)
    {
         var cardNumber = generateCardNumber();
         await _messageActivator.Bus.Send(new CreditAuthorizationRequest(cardNumber));
         await WriteOutputAsync($"Sent credit auth request for card {cardNumber}.");
    }

    private async Task SendMetersRequestAsync(int numberToSend)
    {
         await _messageActivator.Bus.Send(new MetersRequest());
         await WriteOutputAsync("Sent meters request.");
    }

=========== END CLIENT

============

FROM SERVICE

    private void InitializeBus()
    {
        _messageActivator = new BuiltinHandlerActivator();
        _messageActivator.Register<PosOnlineHandler>(() => new PosOnlineHandler(WriteOutputAsync));
        _messageActivator.Register<PumpDownHandler>(() => new PumpDownHandler(WriteOutputAsync));
        _messageActivator.Register<MetersRequestHandler>(() => new MetersRequestHandler(WriteOutputAsync, _messageActivator.Bus));
        _messageActivator.Register<CreditAuthorizationHandler>(() => new CreditAuthorizationHandler(WriteOutputAsync, _messageActivator.Bus));
        Configure.With(_messageActivator)
            .Transport(t => t.UseMsmq("subscriber1"))
            .Routing(r => r.TypeBased()
                .Map<PumpDownEvent>("publisher")
                .Map<PosOnlineEvent>("publisher")
                .Map<MetersRequest>("outgoingMetersRequests")
                .Map<CreditAuthorizationRequest>("creditAuthRequests"))
        .Start();
       _messageActivator.Bus.Subscribe<PumpDownEvent>().Wait();
       _messageActivator.Bus.Subscribe<PosOnlineEvent>().Wait();
    }

public class MetersRequestHandler : IHandleMessages<MetersRequest>
{
    private readonly Random _randomizer = new Random();
    private readonly Func<string, Task> _outputDelegate2;
    private readonly IBus _messageBus;

    public MetersRequestHandler(Func<string, Task> outputDelegate, IBus messageBus)
    {
        _outputDelegate2 = outputDelegate;
        _messageBus = messageBus;
    }

    public async Task Handle(MetersRequest message)
    {
        var pump = _randomizer.Next(20);
        var meters = _randomizer.Next();
        decimal dollars = (decimal)_randomizer.NextDouble();

        var response = new MetersResponse(pump, meters, dollars);
        await _outputDelegate2($"Sending MetersResponse: (Pump={pump}) (Meters={meters}) (Dollars ={dollars}");
        await _messageBus.Reply(response);
    }
}

============

1

There are 1 answers

5
mookid8000 On BEST ANSWER

There's a couple of things that seem a little off in the code you posted. I'll just comment on your issues, and then I will suggest a way forward :)

MetersRequest not being handled. They are just piling up in outgoingMetersRequests queue.

When you call .Routing(t => t.TypeBased().Map<SomeMessage>("someQueue")) you are saying that the SomeMessage type is "owned" by the Rebus endpoint with the input queue someQueue.

When you await bus.Send(yourMessage), Rebus will get the queue that owns the type of yourMessage and send the message there. This explains pretty well why your MetersRequests go to the outgoingMetersRequests queue.

However, you haven't posted any code that shows an endpoint that has the outgoingMetersRequests queue as its input queue. Someone will need to be processing messages off of that queue in order for something to happen.

The "input queue" is what you configure in the .Transport(t => t.UseMsmq("publisher")) part – in this case, the input queue is publisher.

Getting an error saying that creditAuthRequests queue doesn't exist or I don't have permissions.

Yeah – MSMQ error messages usually aren't the best ;)

I've confirmed the queue is not being created, but my assumption is that Rebus will ensure it's created just as the outgoingMetersRequests queue is being created.

Rebus (with MSMQ) ONLY creates input queues and error queues (which by default is called error). This means that you must have had an endpoint running using outgoingMetersRequests as its input queue at least once, or maybe you created the queue manually?

(Side note: The error goes away if I remove the 'await' from Bus.Send(...), but the queue is still not created and the messages are no where to be found.)

The error does not go away – the exception is just caught and passed to a continuation that runs on a threadpool thread which you never get to see because you let go of the Task returned from Send.

I suggest you do this:

  1. Come up with good names for input queues – publisher and subscriber are too generic, you should pick names that correspond to each endpoint's responsibility or reason for existing in the first place.

  2. Be conscious about your message types. It seems you got this right as you have named the events *Event, and your usage of request/reply looks right too.

  3. Use some kind of shared subscription storage, e.g. a central SQL Server. This way – if you configure the same centralized subscription storage in all of your publishers and subscribers – you do not need any endpoint mappings for your event types, which is pretty neat. You can read more on the wiki page about subscription storages in the section about the centralized type.