I am new to ProjectReactor(using reactor-core:3.4.18), I am trying to parallelize the flux consumer subscription , I am creating a Scheduler with max threads as 2, but its failing with the following exception , whereas when i give threadscount as 4 , its working fine .
Scheduler schedulers = Schedulers.newBoundedElastic(2, 2, "PublishedThread");
Flux.range(1, 10)
.parallel()
.runOn(schedulers)
.doOnNext(e -> printName(e))
.subscribe();
[ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$ReactorRejectedExecutionException: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (3/2) reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$ReactorRejectedExecutionException: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (3/2) Caused by: reactor.core.Exceptions$ReactorRejectedExecutionException: Task capacity of bounded elastic scheduler reached while scheduling 1 tasks (3/2) at reactor.core.Exceptions.failWithRejected(Exceptions.java:277)
Can someone help me to understand why giving less number of threads is throwing this exception ?
This behaviour is documented in the scheduler creation method you use. From Schedulers.newBoundedElastic(int, int, String):
So, with the scheduler you created, you cannot dispatch more than 4 tasks at the same time. Now, as you have not specified the parallelism/number of rails to use when calling Flux.parallel, I would say that the Flux tries to create more than 4 parallel tasks.
Workarounds:
Here is a full example creating a ParallelFlux running 8 rails on a new parallel scheduler of 3 threads, and properly waiting for it to finish:
This example program outptut is:
We can see that all 12 elements have been processed/:dispatched on the 3 threads from the created scheduler.