Currently, I tried to implement a Kafka consumer that allows to work with parallel threads an do some processing which involves to do some API calls using reactive rest client. Then, I typically ack() the message.
The issue I'm having is mostly with unit testing because I'm having some unexpected behaviour while running the tests, failing specially while trying to simulate the http calls.
The failures may vary between:
Caused by: io.quarkus.arc.ArcUndeclaredThrowableException: Error invoking subclass method
at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_Subclass.callEndpoint(Unknown Source)
at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_ClientProxy.callEndpoint(Unknown Source)
at com.services.smallrye.kafka.MyConsumer.callEndpoint(KafkaConsumer.java:339)
... 13 more
Caused by: java.lang.InterruptedException
at io.smallrye.faulttolerance.core.retry.Retry.doApply(Retry.java:96)
at io.smallrye.faulttolerance.core.retry.Retry.apply(Retry.java:42)
at io.smallrye.faulttolerance.FaultToleranceInterceptor.syncFlow(FaultToleranceInterceptor.java:255)
at io.smallrye.faulttolerance.FaultToleranceInterceptor.intercept(FaultToleranceInterceptor.java:182)
at io.smallrye.faulttolerance.FaultToleranceInterceptor_Bean.intercept(Unknown Source)
Caused by: java.lang.NullPointerException
at io.quarkus.restclient.config.RestClientsConfig.getInstance(RestClientsConfig.java:323)
at io.quarkus.rest.client.reactive.runtime.RestClientCDIDelegateBuilder.<init>(RestClientCDIDelegateBuilder.java:46)
at io.quarkus.rest.client.reactive.runtime.RestClientCDIDelegateBuilder.createDelegate(RestClientCDIDelegateBuilder.java:42)
at io.quarkus.rest.client.reactive.runtime.RestClientReactiveCDIWrapperBase.delegate(RestClientReactiveCDIWrapperBase.java:76)
at io.quarkus.rest.client.reactive.runtime.RestClientReactiveCDIWrapperBase.<init>(RestClientReactiveCDIWrapperBase.java:30)
at com.services.smallrye.interfaces.MyInterface$$CDIWrapper.<init>(Unknown Source)
at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_Subclass.<init>(Unknown Source)
at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_Bean.doCreate(Unknown Source)
at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_Bean.create(Unknown Source)
at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_Bean.create(Unknown Source)
at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:113)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:37)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:34)
at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:26)
at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:34)
at io.quarkus.arc.impl.ClientProxies.getApplicationScopedDelegate(ClientProxies.java:21)
at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_ClientProxy.arc$delegate(Unknown Source)
at com.services.smallrye.interfaces.MyInterface$$CDIWrapper_ClientProxy.callPushSender(Unknown Source)
at com.services.smallrye.kafka.MyConsumer.callEndpoint(MyConsumer.java:339)
and sometimes I get a third error scenario where it says the rest client is closed, but happens randomly after trying to run the test several times.
This is what I have basically in my Consumer Class:
@ApplicationScoped
public class MyConsumer {
private static Logger logger = LoggerFactory.getLogger(MyConsumer.class);
@Inject
ManagedExecutor managedExecutor;
@Inject
@RestClient
MyInterface myEndpoint;
// other interface definitions for different scenarios...
@Inject
@RestClient
MyOtherInterface myOtherEndpoint;
@Inject
ObjectMapper mapper;
/**
* Method which allows to pick message from "notification" channel and process
* accordingly.
*
* @param message
* @return CompletionStage<Void>
*/
@Incoming("incoming-messages")
@Blocking(ordered = false)
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> consumeFromMyTopic(Message<String> message) {
return CompletableFuture.supplyAsync(() -> processMessage(message), managedExecutor);
}
private Void processMessage(Message<String> message) {
try {
IncomingKafkaRecordMetadata<String, String> metadata = message
.getMetadata(IncomingKafkaRecordMetadata.class)
.orElseThrow(() -> new IllegalStateException("Metadata not found in the message"));
String payload = message.getPayload();
....
//logic to transform my event into a required Request to call myInterface
Request request = mapper.readValue(payload, Request.class);
callEndpoint(m)
....message.ack();
}
.....
Also, my Tests are like the following using Mockito and InjectMock annotations to simulate several scenarios like the expected responses to validate success (and error scenarios). I'm using In-memory connector to avoid connecting to kafka or using testcontainers:
@QuarkusTest
@QuarkusTestResource(KafkaTestResource.class)
class MyConsumerTest {
@Inject
@Any
InMemoryConnector connector;
@InjectMock
@RestClient
MyInterface interface;
private InMemorySource<Message<String>> incoming;
// for any error going to an outgoing item
private InMemorySink<String> outgoing;
private IncomingKafkaRecordMetadata mockMetadata;
@BeforeEach
public void setup() {
incoming = connector.source("incoming-messages");
outgoing = connector.sink("producer");
mockMetadata = Mockito.mock(IncomingKafkaRecordMetadata.class);
//...mock metadata for incoming kafka record
when(interface.callEndpoint(any(Request.class)))
.thenReturn(new Response()......));
}
void testValidMessage() throws JsonProcessingException {
Event event = TestUtils.generateEvent();
ObjectMapper mapper = new ObjectMapper();
Metadata metadata = Metadata.of(mockMetadata);
Message<String> message = Message.of(mapper.writeValueAsString(event), metadata);
incoming.send(message2);
await().atMost(40, TimeUnit.SECONDS).<List<? extends Message<String>>>until(outgoing::received, t -> t.size() == 0);
// Temporary assertion due completable future item with test.
Assertions.assertEquals(0, outgoing.received().size());
}
Just as a reference I'm using fault tolerance as well in my interfaces definition:
@RegisterRestClient
public interface MyInterface {
@POST
@Path("/mypath/...")
@Retry(maxRetries = 2)
ResponseType callEndpoint(Request request);
Is anything that maybe i'm not considering or is missing to make this work as expected?
What I've tried? tried to test it without the fault tolerance item, also tried to use wiremock to simulate http calls, but got errors as well while processing pretty similar to this with no positive results.