While using Parallel on Flux i am stopping thread for some time using thread sleep, but the problem is that flux not waiting till thread sleep time and executed on onComplete on subscribe.
List str = new ArrayList<>(); str.add("spring"); str.add("webflux"); str.add("example");
AtomicInteger num = new AtomicInteger(); ParallelFlux<Object> names = Flux.fromIterable(str) .log() .parallel(2) .runOn(Schedulers.boundedElastic()) .map( s-> { if(s.equalsIgnoreCase("webflux")) { try { System.out.println("waiting..."); Thread.sleep(1000); System.out.println("done..."); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return s+" "+num.incrementAndGet(); }); names.subscribe(s -> { System.out.println("value "+s+" thread : "+Thread.currentThread().getName()); });
Output:
19:35:24.870 [main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
19:35:24.896 [main] INFO reactor.Flux.Iterable.1 - | request(256)
19:35:24.897 [main] INFO reactor.Flux.Iterable.1 - | onNext(spring)
19:35:24.898 [main] INFO reactor.Flux.Iterable.1 - | onNext(webflux)
19:35:24.898 [main] INFO reactor.Flux.Iterable.1 - | onNext(example)
waiting...
value spring 1 thread : boundedElastic-1
value example 2 thread : boundedElastic-1
19:35:24.899 [main] INFO reactor.Flux.Iterable.1 - | onComplete()