I am attempting to perform request / reply with Rebus but also using the Windsor container NuGet package: https://github.com/rebus-org/Rebus.CastleWindsor
After looking at the following samples:
https://github.com/rebus-org/RebusSamples/tree/master/RequestReply
https://github.com/rebus-org/RebusSamples/tree/master/Integration
I have cobbled this example below together:
public class ContactModule : NancyModule
{
public ContactModule(IBus bus)
{
Get["/v1/contact"] = parameters =>
{
var contacts = new List<Contact>();
using (var activator = new BuiltinHandlerActivator())
{
var settings = new Settings();
activator.Handle<GetContactsResponse>(response =>
{
contacts = response.Contacts;
return Task.FromResult(0);
});
Configure.With(activator)
.Logging(l => l.ColoredConsole(LogLevel.Warn))
.Transport(t => t.UseRabbitMq(
settings.RabbitMQConnectionString,
settings.OutputQueueName)) // we listen for messages in the output queue
.Routing(r => r.TypeBased().MapAssemblyOf<GetContactsRequest>(settings.InputQueueName)) // but we publish to the input queue
.Options(o =>
{
o.EnableCompression();
o.EnableEncryption(settings.MessageEncryptionKey);
})
.Start();
activator.Bus.Publish(new GetContactsRequest``()).Wait();
}
return Response.AsJson(contacts);
};
}
}
I know it should probably look more like the below when using the Windsor container approach, but I do not know how to listen for the reply:
public class ContactModule : NancyModule
{
public ContactModule(IBus bus)
{
Get["/v1/contact"] = parameters =>
{
var contacts = new List<Contact>();
bus.Publish(new GetContactsRequest()).Wait();
// How do I listen for the reply?
return Response.AsJson(contacts);
};
}
}
My message handler:
public class GetContactsHandler : IHandleMessages<GetContactsRequest>
{
private readonly IBus _bus;
private readonly Settings _settings;
public GetContactsHandler(IBus bus, Settings settings)
{
_bus = bus;
_settings = settings;
}
public async Task Handle(GetContactsRequest request)
{
// Fetch contacts from db...
var contacts = new List<Contact>();
await _bus.Reply(new GetContactsResponse {Contacts = contacts});
}
}
My Rebus Windsor installer:
public class RebusInstaller : IWindsorInstaller
{
public void Install(IWindsorContainer container, IConfigurationStore store)
{
var settings = new Settings();
Configure.With(new CastleWindsorContainerAdapter(container))
.Logging(l => l.ColoredConsole(LogLevel.Warn))
.Transport(t => t.UseRabbitMqAsOneWayClient(
settings.RabbitMQConnectionString))
.Routing(r => r.TypeBased().MapAssemblyOf<GetContactsRequest>(settings.InputQueueName))
.Options(o =>
{
o.EnableCompression();
o.EnableEncryption(settings.MessageEncryptionKey);
})
.Start();
}
}
The problem I am facing is that I want to use the request / reply pattern in my web api to request a list of contacts, wait for the reply containing the retrieved contacts and return them to the api caller.
But If I want to use the Windsor container adapter for Rebus the IHandlerActivator interface does not expose the .Handle method which allows for registering inline message handlers, in which I am grabbing the contacts from the reply and then sending them back to the api caller.
Is there a way to do this, or am I approaching the problem incorrectly?
Edit: As you can see in the first example I am injecting in the IBus interface from the Windsor container. But if I use the injected bus how do I tell it to listen for the reply comming back from the message handler?
Update: Rebus.Async was exactly what I was looking for: https://github.com/rebus-org/Rebus.Async
When you are using a real IoC container, you should create a class that implements
IHandleMessages<TMessage>
, e.g. like this:which scales much better as you add handlers and assisting application and domain services to your program.
This of course means that you must bring in the Rebus.CastleWindsor NuGet package and pass a
WindsorContainerAdapter
to Rebus'.With(...)
method.When you need to register handlers in Windsor, you can take advantage of the extension methods from Rebus.CastleWindsor which allows for doing these things:
UPDATE after you added some more info – you said:
and I would say "yes", because a service bus is not very good at synchronous request/reply – you should use something "request/reply-oriented" for that, e.g. HTTP(*).
Also – I know almost nothing about Nancy, but I am pretty sure that
will deadlock, at least if you are hosting Nancy in ASP.NET.
You should always
when you can, and in Nancy you can just do this:
PS: Last thing: Remember to keep the container around as long as you want your endpoint to process messages.
In the code you posted, you dispose the
BuiltinHandlerActivator
right after publishing the event, which means that you will most likely not get to process any messages at all.If you use Windsor, the bus will be disposed when you dispose the Windsor container.
(*) Although Rebus does actually have Rebus.Async which is a request/response API that could be used in your scenario.
I would not recommend going down this road though. You should probably only do this if you intend to do it this once, and then never again :)