I'm having an issue with a quarkus application, we're entering from a RestEasy endpoint and wanting to branch out with a set of blocking calls downstream, to speed up the execution we wish to run these blocking tasks concurrently. Currently the solution looks like below:
public List<Trade> validateTrades(final List<TradeRequest<TradeRequestDetails>> sendTrades) {
// Convert the list of TradeRequest to a Multi
return Multi.createFrom().iterable(sendTrades)
// Use runSubscriptionOn to specify the executor for subscription handling
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
// Map each TradeRequest to a Uni<Trade> by validating the trade asynchronously
.onItem().transformToUniAndMerge(tradeRequest ->
Uni.createFrom().item(() -> validateTrade(tradeRequest))
)
// Collect the results into a list
.collect().asList()
.await().atMost(Duration.ofMinutes(2));
}
What we're seeing is that each validateTrade(tradeRequest) is still being executor serially, I've tried using different executors like Infrastructure.getDefaultWorkerPool() or:
@Inject
@ManagedExecutorConfig(maxAsync = 5, maxQueued = 5)
@NamedInstance("tradeManagedExecutor")
ManagedExecutor managedExecutor;
Would appreciate any help I can get in this regard, maybe I'm just not understanding the Mutiny API or it's integration with Quarkus.
Point to note is that the validateTrade(tradeRequest) also requires the use of a JWT that is in the context of the application, so we require that this context is propagated to the worker threads.
Edit:
I'm adding in some of my latest findings below of various attempts:
Alternative 1
This approach using a Uni and combining them with Infrastructure.getDefaultWorkerPool(), still runs sequentially
public List<Trade> validateTrades(final List<TradeRequest<TradeRequestDetails>> sendTrades) {
return Uni.combine().all()
.unis(sendTrades.stream().map(trade -> Uni.createFrom().item(() -> validateTrade(trade))).toList())
.combinedWith(listOfResponses -> listOfResponses.stream().map(Trade.class::cast).toList())
.emitOn(Infrastructure.getDefaultWorkerPool())
.await().atMost(Duration.ofMinutes(2));
}
Alternative 2
This approach using a Uni and combining them with Infrastructure.getDefaultWorkerPool(), runs in parallell but failes with error.
public List<Trade> validateTrades(final List<TradeRequest<TradeRequestDetails>> sendTrades) {
return sendTrades.stream()
.parallel()
.map(this::validateTrade)
.toList();
}
Caused by: jakarta.enterprise.context.ContextNotActiveException: RequestScoped context was not active when trying to obtain a bean instance for a client proxy of CLASS bean [class=se.sebank.portfoliomgt.tradingservice.instrument.domain.InstrumentService, id=56969d55d00200c862f05c62808ce63ddb2d7514]
- you can activate the request context for a specific method using the @ActivateRequestContext interceptor binding
at io.quarkus.arc.impl.ClientProxies.getDelegate(ClientProxies.java:55)
Alternative 3
Not using mutiny or paralellstreams, in this case we start facing context propagation issues.
public List<Trade> validateTrades(final List<TradeRequest<TradeRequestDetails>> sendTrades) throws ExecutionException, InterruptedException {
List<CompletableFuture<Trade>> futureTrades = sendTrades.stream()
.map(trade -> CompletableFuture.supplyAsync(() -> this.validateTrade(trade), managedExecutor))
.toList();
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futureTrades.toArray(new CompletableFuture[0]));
return allDoneFuture.thenApply(v ->
futureTrades.stream()
.map(CompletableFuture::join) // Retrieves the result of each CompletableFuture
.toList()).get();
}
The error:
Error invoking subclass method
at org.jboss.resteasy.core.ExceptionHandler.handleApplicationException(ExceptionHandler.java:107)
at org.jboss.resteasy.core.ExceptionHandler.handleException(ExceptionHandler.java:344)
at org.jboss.resteasy.core.SynchronousDispatcher.writeException(SynchronousDispatcher.java:205)
at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:452)
at org.jboss.resteasy.core.SynchronousDispatcher.lambda$invoke$4(SynchronousDispatcher.java:240)
at org.jboss.resteasy.core.SynchronousDispatcher.lambda$preprocess$0(SynchronousDispatcher.java:154)
at org.jboss.resteasy.core.interception.jaxrs.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:321)
at org.jboss.resteasy.core.SynchronousDispatcher.preprocess(SynchronousDispatcher.java:157)
at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:229)
at io.quarkus.resteasy.runtime.standalone.RequestDispatcher.service(RequestDispatcher.java:82)
at io.quarkus.resteasy.runtime.standalone.VertxRequestHandler.dispatch(VertxRequestHandler.java:147)
at io.quarkus.resteasy.runtime.standalone.VertxRequestHandler$1.run(VertxRequestHandler.java:93)
at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:576)
at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538)
at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.quarkus.arc.ArcUndeclaredThrowableException: Error invoking subclass method
at se.sebank.portfoliomgt.tradingservice.trade.boundary.TradeResource_Subclass.validateTrades(Unknown Source)
at se.sebank.portfoliomgt.tradingservice.trade.boundary.TradeResource_ClientProxy.validateTrades(Unknown Source)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:154)
at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:118)
at org.jboss.resteasy.core.ResourceMethodInvoker.internalInvokeOnTarget(ResourceMethodInvoker.java:560)
at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTargetAfterFilter(ResourceMethodInvoker.java:452)
at org.jboss.resteasy.core.ResourceMethodInvoker.lambda$invokeOnTarget$2(ResourceMethodInvoker.java:413)
at org.jboss.resteasy.core.interception.jaxrs.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:321)
at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTarget(ResourceMethodInvoker.java:415)
at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:378)
at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:356)
at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:70)
at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:429)
... 15 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot read the array length because "params" is null
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at se.sebank.portfoliomgt.tradingservice.trade.domain.TradeService.validateTrades(TradeService.java:125)
at se.sebank.portfoliomgt.tradingservice.trade.domain.TradeService_ClientProxy.validateTrades(Unknown Source)
at se.sebank.portfoliomgt.tradingservice.trade.boundary.TradeResource.validateTrades(TradeResource.java:74)
It is because your path of execution is serial (one by one processing the elements of the list)
Maybe use a parallel Stream as an alternative
Java ParallelStream: several map or single map
So i guess you need to feed the list manually to a task executor