Qpid - Load balancing messages to consumers that have different performances characteristics

376 views Asked by At

I have the following scenario :

I’m pushing 100 messages to a queue shared by 2 consumers. Both subscribers subscribe to the queue in pre-acquire mode and explicit mode. After each message is handled, each subscriber accepts the message to remove it from the queue. The pseudo codes look likes that :

OnMessageTransfer(message) :
    DoSomethingWithMessage(message)
    Session.MessageAccept(message)

The messages are load balanced correctly, each message is processed once and only once, but we discovered that it doesn’t take into account the processing time for each consumer. For instance, let’s assume that consumer A is taking 50ms to process a message and consumer B is taking 5seconds. Ideally, consumer B should start processing 1 message and in the meantime, consumer A should process the 99 others. However, what happens is that consumer B will actually process 25 messages in 50 seconds while consumer A will process the 75 others in ~4 seconds and will idle. The client api seems to prefetch the messages, which is clearly non optimal in this situation.

How can we solve this problem?

We’re using Qpid cpp 0.5 and the fully managed c# 0-10 client API, not the cpp bindings (but my understanding is that this behavior is not linked to the implementation of the API)

Regards,

Julien

1

There are 1 answers

0
Julien On BEST ANSWER

This behaviour can be changed by configuring the message flow for the queue. To avoid prefetching of messages, it should be set to a credit of 1 message and updated after handling each message. The pseudo code would looks like that :

InitSubscription(queue) : 
    MessageSubscribe(queue, AcceptMode.Explicit, AcquireMode.PreAcquired)
    MessageSetFlowMode(queue, FlowMode.Credit)
    MessageFlow(queue, CreditUnit.Byte, MAX_BYTES)
    MessageFlow(queue, CreditUit.Message, 1) // will disable prefetch

OnMessageTransfer(message) :
    DoSomethingWithMessage(message)
    MessageAccept(message)
    MessageFlow(queue, CreditUit.Message, 1) // reissue a credit for 1 and only 1 message