How to achieve concurrent processing of messages from different Kafka partitions on the same topic

98 views Asked by At

Quarkus version = 2.16.5.Final

I am trying to achieve concurrent processing of messages that are in different partitions but same topic. I also want to have explicit control on the commit of each message and do not want to commit until the message has been processed. Since the messages are coming in different partitions based on the key I believe this should be easily achievable.

However what I am finding is that there is serial processing of messages and a message on a different partition is not picked up until the currently processing message is committed. Why is this and how do I resolve this ?

My code:

@ApplicationScoped
public class Consumer {


  @Incoming("chat-in")
  @Acknowledgment(Strategy.MANUAL)
  @Blocking("worker-pool")
  public CompletionStage<Void> consume(Message<String> message) throws InterruptedException {
    return consumeMessage(message);
  }
}

My properties are set as below :

mp.messaging.incoming.chat-in.connector=smallrye-kafka
mp.messaging.incoming.chat-in.group.id=chat-processor
mp.messaging.incoming.chat-in.key.deserializer=org.apache.kafka.common.serialization.LongDeserializer
mp.messaging.incoming.chat-in.topic=chat
mp.messaging.incoming.chat-in.partitions=3
mp.messaging.incoming.chat-in.commit-strategy=latest
mp.messaging.incoming.chat-in.enable.auto.commit=false
smallrye.messaging.worker.worker-pool.max-concurrency=3

As you can see from above I have set the partitions property to three and the worker pool size to three as well.

Also when I change the annotation of @Blocking("worker-pool") to @Blocking(value = "worker-pool", ordered = false) what I find is that messages within the same partition are being consumed in a concurrent fashion i.e. skipping the order within the same partition, which is not what I want at all. I would like concurrent processing across partitions but that order within a partition be respected.

0

There are 0 answers