I'm trying to add some control variables to a quarkus 3.2/smallrye based kafka consumer app. I have some code like this:
@ApplicationScoped
public class MyConsumer {
private static Logger logger = LoggerFactory.getLogger(MyConsumer.class);
@Inject
ManagedExecutor managedExecutor;
@Inject
@RestClient
MyInterface myEndpoint;
// other interface definitions for different scenarios...
@Inject
@RestClient
MyOtherInterface myOtherEndpoint;
@Inject
ObjectMapper mapper;
/**
* Method which allows to pick message from "notification" channel and process
* accordingly.
*
* @param message
* @return CompletionStage<Void>
*/
@Incoming("incoming-messages")
@Blocking(ordered = false)
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> consumeFromMyTopic(Message<String> message) {
return CompletableFuture.supplyAsync(() -> processMessage(message), managedExecutor);
}
private Void processMessage(Message<String> message) {
IncomingKafkaRecordMetadata<String, String> metadata = message
.getMetadata(IncomingKafkaRecordMetadata.class)
.orElseThrow(() -> new IllegalStateException("Metadata not found in the message"));
String payload = message.getPayload();
// ....
// logic to transform my event into a required Request to call myInterface
Request request = mapper.readValue(payload, Request.class);
callEndpoint(m);
// ....
message.ack();
}
}
Where it can be seen, that i'm using a managed executor to handle several threads to get a little better processing performance.
However, under certain circumstances (like slow response times sometimes), I observed vert.x thread blocking errors for different reasons, therefore those scenario generates some kind of max age processing limit for kafka records, and the consumer sometime stops (under throttling strategy)
For this kind of implementation, is any way to enforce or create something like a max "timeout" parameter for a thread to execute?
I tried to set this property: quarkus.vertx.max-worker-execute-time, but in all the tests I did (even with pretty low values) doesn't do anything apparently.
Currently researching if there is another option suitable for this use case.
Maybe you can try to implement this with Mutiny, because it has support for timeouts & fallbacks.
You could try something like this:
When the retry fails, your message should be automatically not acknowledged, so the message stays on the topic, and your consumer will try to consume the message again.
I hope this helps you.
For more info on Mutiny & timeouts, check this link:
https://smallrye.io/smallrye-mutiny/2.5.3/guides/handling-timeouts/