Unexpected Behaviour while testing Smallrye Kafka Consumer in Quarkus 3.2.6

168 views Asked by At

Currently, I tried to implement a Kafka consumer that allows to work with parallel threads an do some processing which involves to do some API calls using reactive rest client. Then, I typically ack() the message.

The issue I'm having is mostly with unit testing because I'm having some unexpected behaviour while running the tests, failing specially while trying to simulate the http calls.

The failures may vary between:

Caused by: io.quarkus.arc.ArcUndeclaredThrowableException: Error invoking subclass method
        at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_Subclass.callEndpoint(Unknown Source)
        at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_ClientProxy.callEndpoint(Unknown Source)
        at com.services.smallrye.kafka.MyConsumer.callEndpoint(KafkaConsumer.java:339)
        ... 13 more
Caused by: java.lang.InterruptedException
        at io.smallrye.faulttolerance.core.retry.Retry.doApply(Retry.java:96)
        at io.smallrye.faulttolerance.core.retry.Retry.apply(Retry.java:42)
        at io.smallrye.faulttolerance.FaultToleranceInterceptor.syncFlow(FaultToleranceInterceptor.java:255)
        at io.smallrye.faulttolerance.FaultToleranceInterceptor.intercept(FaultToleranceInterceptor.java:182)
        at io.smallrye.faulttolerance.FaultToleranceInterceptor_Bean.intercept(Unknown Source)

Caused by: java.lang.NullPointerException
        at io.quarkus.restclient.config.RestClientsConfig.getInstance(RestClientsConfig.java:323)
        at io.quarkus.rest.client.reactive.runtime.RestClientCDIDelegateBuilder.<init>(RestClientCDIDelegateBuilder.java:46)
        at io.quarkus.rest.client.reactive.runtime.RestClientCDIDelegateBuilder.createDelegate(RestClientCDIDelegateBuilder.java:42)
        at io.quarkus.rest.client.reactive.runtime.RestClientReactiveCDIWrapperBase.delegate(RestClientReactiveCDIWrapperBase.java:76)
        at io.quarkus.rest.client.reactive.runtime.RestClientReactiveCDIWrapperBase.<init>(RestClientReactiveCDIWrapperBase.java:30)
        at com.services.smallrye.interfaces.MyInterface$$CDIWrapper.<init>(Unknown Source)
        at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_Subclass.<init>(Unknown Source)
        at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_Bean.doCreate(Unknown Source)
        at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_Bean.create(Unknown Source)
        at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_Bean.create(Unknown Source)
        at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:113)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:37)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:34)
        at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:26)
        at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
        at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:34)
        at io.quarkus.arc.impl.ClientProxies.getApplicationScopedDelegate(ClientProxies.java:21)
        at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_ClientProxy.arc$delegate(Unknown Source)
        at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_ClientProxy.callPushSender(Unknown Source)
        at com.services.smallrye.kafka.MyConsumer.callEndpoint(MyConsumer.java:339)

and sometimes I get a third error scenario where it says the rest client is closed, but happens randomly after trying to run the test several times.

This is what I have basically in my Consumer Class:

@ApplicationScoped
public class MyConsumer {

    private static Logger logger = LoggerFactory.getLogger(MyConsumer.class);

    @Inject
    ManagedExecutor managedExecutor;

    @Inject
    @RestClient
    MyInterface myEndpoint;

    // other interface definitions for different scenarios...
    @Inject
    @RestClient
    MyOtherInterface myOtherEndpoint;

    @Inject
    ObjectMapper mapper;

    /**
     * Method which allows to pick message from "notification" channel and process
     * accordingly.
     * 
     * @param message
     * @return CompletionStage<Void>
     */
    @Incoming("incoming-messages")
    @Blocking(ordered = false)
    @Acknowledgment(Acknowledgment.Strategy.MANUAL)
    public CompletionStage<Void> consumeFromMyTopic(Message<String> message) {
        return CompletableFuture.supplyAsync(() -> processMessage(message), managedExecutor);

    }

   private Void processMessage(Message<String> message) {
        try {
            IncomingKafkaRecordMetadata<String, String> metadata = message
                    .getMetadata(IncomingKafkaRecordMetadata.class)
                    .orElseThrow(() -> new IllegalStateException("Metadata not found in the message"));

            String payload = message.getPayload();

            ....
      //logic to transform my event into a required Request to call myInterface

       Request request = mapper.readValue(payload, Request.class);
      callEndpoint(m)


    ....message.ack();

    }

.....

Also, my Tests are like the following using Mockito and InjectMock annotations to simulate several scenarios like the expected responses to validate success (and error scenarios). I'm using In-memory connector to avoid connecting to kafka or using testcontainers:

@QuarkusTest
@QuarkusTestResource(KafkaTestResource.class)
class MyConsumerTest {

  @Inject
  @Any
  InMemoryConnector connector;

  @InjectMock
  @RestClient
  MyInterface interface;


  private InMemorySource<Message<String>> incoming;

  // for any error going to an outgoing item 
  private InMemorySink<String> outgoing;
  private IncomingKafkaRecordMetadata mockMetadata;

  @BeforeEach
  public void setup() {
    incoming = connector.source("incoming-messages");
    outgoing = connector.sink("producer");
    mockMetadata = Mockito.mock(IncomingKafkaRecordMetadata.class);
    //...mock metadata for incoming kafka record

    when(interface.callEndpoint(any(Request.class)))
        .thenReturn(new Response()......));

  }

  void testValidMessage() throws JsonProcessingException {
    Event event = TestUtils.generateEvent();

    ObjectMapper mapper = new ObjectMapper();

    Metadata metadata = Metadata.of(mockMetadata);
    Message<String> message = Message.of(mapper.writeValueAsString(event), metadata);

    incoming.send(message2);

    await().atMost(40, TimeUnit.SECONDS).<List<? extends Message<String>>>until(outgoing::received, t -> t.size() == 0);

    // Temporary assertion due completable future item with test.
    Assertions.assertEquals(0, outgoing.received().size());
  }


Just as a reference I'm using fault tolerance as well in my interfaces definition:

@RegisterRestClient
public interface MyInterface {

    @POST
    @Path("/mypath/...")
    @Retry(maxRetries = 2)
    ResponseType callEndpoint(Request request);

Is anything that maybe i'm not considering or is missing to make this work as expected?

What I've tried? tried to test it without the fault tolerance item, also tried to use wiremock to simulate http calls, but got errors as well while processing pretty similar to this with no positive results.

0

There are 0 answers