Running blocking operations concurrently

106 views Asked by At

I'm having an issue with a quarkus application, we're entering from a RestEasy endpoint and wanting to branch out with a set of blocking calls downstream, to speed up the execution we wish to run these blocking tasks concurrently. Currently the solution looks like below:

    public List<Trade> validateTrades(final List<TradeRequest<TradeRequestDetails>> sendTrades) {
        // Convert the list of TradeRequest to a Multi
        return Multi.createFrom().iterable(sendTrades)
                // Use runSubscriptionOn to specify the executor for subscription handling
                .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
                // Map each TradeRequest to a Uni<Trade> by validating the trade asynchronously
                .onItem().transformToUniAndMerge(tradeRequest ->
                        Uni.createFrom().item(() -> validateTrade(tradeRequest))
                )
                // Collect the results into a list
                .collect().asList()
                .await().atMost(Duration.ofMinutes(2));
    }

What we're seeing is that each validateTrade(tradeRequest) is still being executor serially, I've tried using different executors like Infrastructure.getDefaultWorkerPool() or:

    @Inject
    @ManagedExecutorConfig(maxAsync = 5, maxQueued = 5)
    @NamedInstance("tradeManagedExecutor")
    ManagedExecutor managedExecutor;

Would appreciate any help I can get in this regard, maybe I'm just not understanding the Mutiny API or it's integration with Quarkus.

Point to note is that the validateTrade(tradeRequest) also requires the use of a JWT that is in the context of the application, so we require that this context is propagated to the worker threads.

Edit:

I'm adding in some of my latest findings below of various attempts:

Alternative 1
This approach using a Uni and combining them with Infrastructure.getDefaultWorkerPool(), still runs sequentially

public List<Trade> validateTrades(final List<TradeRequest<TradeRequestDetails>> sendTrades) {
        return Uni.combine().all()
                .unis(sendTrades.stream().map(trade -> Uni.createFrom().item(() -> validateTrade(trade))).toList())
                .combinedWith(listOfResponses -> listOfResponses.stream().map(Trade.class::cast).toList())
                .emitOn(Infrastructure.getDefaultWorkerPool())
                .await().atMost(Duration.ofMinutes(2));
    }

Alternative 2
This approach using a Uni and combining them with Infrastructure.getDefaultWorkerPool(), runs in parallell but failes with error.

public List<Trade> validateTrades(final List<TradeRequest<TradeRequestDetails>> sendTrades) {
        return sendTrades.stream()
                .parallel()
                .map(this::validateTrade)
                .toList();
    }
Caused by: jakarta.enterprise.context.ContextNotActiveException: RequestScoped context was not active when trying to obtain a bean instance for a client proxy of CLASS bean [class=se.sebank.portfoliomgt.tradingservice.instrument.domain.InstrumentService, id=56969d55d00200c862f05c62808ce63ddb2d7514]
        - you can activate the request context for a specific method using the @ActivateRequestContext interceptor binding
        at io.quarkus.arc.impl.ClientProxies.getDelegate(ClientProxies.java:55)

Alternative 3
Not using mutiny or paralellstreams, in this case we start facing context propagation issues.

public List<Trade> validateTrades(final List<TradeRequest<TradeRequestDetails>> sendTrades) throws ExecutionException, InterruptedException {
        List<CompletableFuture<Trade>> futureTrades = sendTrades.stream()
                .map(trade -> CompletableFuture.supplyAsync(() -> this.validateTrade(trade), managedExecutor))
                .toList();

        CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futureTrades.toArray(new CompletableFuture[0]));

        return allDoneFuture.thenApply(v ->
                futureTrades.stream()
                        .map(CompletableFuture::join) // Retrieves the result of each CompletableFuture
                        .toList()).get();
    }

The error:

Error invoking subclass method
        at org.jboss.resteasy.core.ExceptionHandler.handleApplicationException(ExceptionHandler.java:107)
        at org.jboss.resteasy.core.ExceptionHandler.handleException(ExceptionHandler.java:344)
        at org.jboss.resteasy.core.SynchronousDispatcher.writeException(SynchronousDispatcher.java:205)
        at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:452)
        at org.jboss.resteasy.core.SynchronousDispatcher.lambda$invoke$4(SynchronousDispatcher.java:240)
        at org.jboss.resteasy.core.SynchronousDispatcher.lambda$preprocess$0(SynchronousDispatcher.java:154)
        at org.jboss.resteasy.core.interception.jaxrs.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:321)
        at org.jboss.resteasy.core.SynchronousDispatcher.preprocess(SynchronousDispatcher.java:157)
        at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:229)
        at io.quarkus.resteasy.runtime.standalone.RequestDispatcher.service(RequestDispatcher.java:82)
        at io.quarkus.resteasy.runtime.standalone.VertxRequestHandler.dispatch(VertxRequestHandler.java:147)
        at io.quarkus.resteasy.runtime.standalone.VertxRequestHandler$1.run(VertxRequestHandler.java:93)
        at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:576)
        at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538)
        at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
        at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.quarkus.arc.ArcUndeclaredThrowableException: Error invoking subclass method
        at se.sebank.portfoliomgt.tradingservice.trade.boundary.TradeResource_Subclass.validateTrades(Unknown Source)
        at se.sebank.portfoliomgt.tradingservice.trade.boundary.TradeResource_ClientProxy.validateTrades(Unknown Source)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:154)
        at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:118)
        at org.jboss.resteasy.core.ResourceMethodInvoker.internalInvokeOnTarget(ResourceMethodInvoker.java:560)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTargetAfterFilter(ResourceMethodInvoker.java:452)
        at org.jboss.resteasy.core.ResourceMethodInvoker.lambda$invokeOnTarget$2(ResourceMethodInvoker.java:413)
        at org.jboss.resteasy.core.interception.jaxrs.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:321)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTarget(ResourceMethodInvoker.java:415)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:378)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:356)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:70)
        at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:429)
        ... 15 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot read the array length because "params" is null
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
        at se.sebank.portfoliomgt.tradingservice.trade.domain.TradeService.validateTrades(TradeService.java:125)
        at se.sebank.portfoliomgt.tradingservice.trade.domain.TradeService_ClientProxy.validateTrades(Unknown Source)
        at se.sebank.portfoliomgt.tradingservice.trade.boundary.TradeResource.validateTrades(TradeResource.java:74)
1

There are 1 answers

1
firephil On

It is because your path of execution is serial (one by one processing the elements of the list)

.onItem().transformToUniAndMerge(tradeRequest ->
Uni.createFrom().item(() -> validateTrade(tradeRequest))

Maybe use a parallel Stream as an alternative

sendTrades.parallelStream().forEach()

still sequential

Java ParallelStream: several map or single map

So i guess you need to feed the list manually to a task executor

ExecutorService executor = Executors.newFixedThreadPool(8);
    
 for (int i = 0; i < 8; i++) {
 final int taskNumber = i;
 executor.execute(() -> {
 System.out.println("Task " + taskNumber + " executed by thread " + Thread.currentThread().getName());// debug info
                });

executor.shutdown();