Rebus MSMQ Centralized Storage

183 views Asked by At

When enabling centralized storage with MSMQ, is there anything else that needs to change? Here's what I have, but the code will not fully load. Works as expected if I disable centralized storage.

_messageActivator = new BuiltinHandlerActivator();
_messageActivator.Register<PosOnlineHandler>(() => new PosOnlineHandler(WriteOutputAsync));
_messageActivator.Register<PumpDownHandler>(() => new PumpDownHandler(WriteOutputAsync));
_messageActivator.Register<MetersRequestHandler>(() => new MetersRequestHandler(WriteOutputAsync, _messageActivator.Bus));
_messageActivator.Register<CreditAuthorizationHandler>(() => new CreditAuthorizationHandler(WriteOutputAsync, _messageActivator.Bus));
Configure.With(_messageActivator)
   .Transport(t => t.UseMsmq("consumerServiceQueue"))
   .Routing(r => r.TypeBased()
      .Map<PumpDownEvent>("publisherServiceQueue")
      .Map<PosOnlineEvent>("publisherServiceQueue")
      .Map<MetersResponse>("publisherServiceQueue")
      .Map<CreditAuthorizationResponse>("publisherServiceQueue"))
   .Subscriptions(s => s.StoreInSqlServer(@"Data Source=(localdb)\MSSQLLocalDB;Initial Catalog=test;Integrated Security=True;Connect Timeout=30;Encrypt=False;TrustServerCertificate=True;ApplicationIntent=ReadWrite;MultiSubnetFailover=False", "RebusSubscriptions", true, true))
   .Start();
   _messageActivator.Bus.Subscribe<PumpDownEvent>().Wait();
   _messageActivator.Bus.Subscribe<PosOnlineEvent>().Wait();

Thanks. Scott C.

1

There are 1 answers

2
mookid8000 On BEST ANSWER

Here's an explanation: Windows Forms (which you mentioned that you are calling this code from) uses a task scheduler that insists on running continuations on the UI thread.

This is pretty useful in a Windows Forms context, because controls may only be manipulated from that thread.

The problem arises when you call .Wait() (or .Result) on a Task, because that blocks the current thread, waiting for all continuations to finish executing – but since the current thread is blocked, it will never be available for running continuations, thus resulting in a deadlock.

Here's another question that discusses this issue. As you can see, the issue is in no way specific for Rebus, it is simply a general trait of the mix of async Tasks and Windows Forms (or WPF or ASP.NET for that matter).

I suggest you investigate how to properly run async code during the initialization of a System.Windows.Forms.Form, so you can establish your subscriptions in the recommended way by awaiting the Tasks:

await bus.Subscribe<PumpDownEvent>();

await bus.Subscribe<PosOnlineEvent>();

Why does this issue only show when isCentralized: true is passed to the StoreInSqlServer configuration method?

Simply put, it's because nothing truly asynchronous actually happens when Rebus is configured to use decentralized subscription storage with MSMQ as the transport.

When running centralized, the subscriber registers itself as a subscriber in the subscription storage. With the configuration you are using, that means that an SqlConnection gets (asynchronously) opened, and then an ExecuteNonQueryAsync is (again asynchronously) executed, inserting a row in the [RebusSubscriptions] table.

When running decentralized, the subscriber registers itself as a subscriber by sending the publisher a SubscribeRequest. With MSMQ this operations just happens to execute synchronously without awaiting a single thing, because MSMQ does not have an API that returns Tasks.

If you had used SQL Server, Azure Service Bus, Amazon SQS, etc. as the transport, there would have been some asynchronous stuff going down when sending the SubscribeRequest, and then you would have gotten the same deadlock.

I hope that explains it :)