Smallrye/Quarkus - Any way to control the Max execution time to avoid Blocking and processing errors?

121 views Asked by At

I'm trying to add some control variables to a quarkus 3.2/smallrye based kafka consumer app. I have some code like this:

@ApplicationScoped
public class MyConsumer {

    private static Logger logger = LoggerFactory.getLogger(MyConsumer.class);

    @Inject
    ManagedExecutor managedExecutor;

    @Inject
    @RestClient
    MyInterface myEndpoint;

    // other interface definitions for different scenarios...
    @Inject
    @RestClient
    MyOtherInterface myOtherEndpoint;

    @Inject
    ObjectMapper mapper;

    /**
     * Method which allows to pick message from "notification" channel and process
     * accordingly.
     * 
     * @param message
     * @return CompletionStage<Void>
     */
    @Incoming("incoming-messages")
    @Blocking(ordered = false)
    @Acknowledgment(Acknowledgment.Strategy.MANUAL)
    public CompletionStage<Void> consumeFromMyTopic(Message<String> message) {
        return CompletableFuture.supplyAsync(() -> processMessage(message), managedExecutor);

    }

    private Void processMessage(Message<String> message) {
        IncomingKafkaRecordMetadata<String, String> metadata = message
                .getMetadata(IncomingKafkaRecordMetadata.class)
                .orElseThrow(() -> new IllegalStateException("Metadata not found in the message"));

        String payload = message.getPayload();

        // ....
        // logic to transform my event into a required Request to call myInterface

        Request request = mapper.readValue(payload, Request.class);
        callEndpoint(m);


        // ....
        message.ack();

    }
}

Where it can be seen, that i'm using a managed executor to handle several threads to get a little better processing performance.

However, under certain circumstances (like slow response times sometimes), I observed vert.x thread blocking errors for different reasons, therefore those scenario generates some kind of max age processing limit for kafka records, and the consumer sometime stops (under throttling strategy)

For this kind of implementation, is any way to enforce or create something like a max "timeout" parameter for a thread to execute?

I tried to set this property: quarkus.vertx.max-worker-execute-time, but in all the tests I did (even with pretty low values) doesn't do anything apparently.

Currently researching if there is another option suitable for this use case.

1

There are 1 answers

0
Serkan On

Maybe you can try to implement this with Mutiny, because it has support for timeouts & fallbacks.

You could try something like this:

@RegisterRestClient(configKey = "myinterface")
public interface MyInterface {

    @POST
    Uni<Response> callEndpoint(JsonObject payload);
}
    @Inject
    ObjectMapper objectMapper;

    @RestClient
    MyInterface restClient;

    @Incoming("incoming-messages")
    public Uni<Void> consumeFromMyTopic(Message<String> message) {
        var payload = this.objectMapper.readValue(message.getPayload(), JsonObject.class);

        return restClient.callEndpoint(payload)
                .onFailure()
                .retry()
                .withBackOff(Duration.of(5, SECONDS))
                .expireIn(Duration.of(20, SECONDS).toMillis());       
    }
quarkus.rest-client.myinterface.connect-timeout=5000

When the retry fails, your message should be automatically not acknowledged, so the message stays on the topic, and your consumer will try to consume the message again.

I hope this helps you.

For more info on Mutiny & timeouts, check this link:

https://smallrye.io/smallrye-mutiny/2.5.3/guides/handling-timeouts/