Reading from a Flux<Integer> in chunks

1.6k views Asked by At

Is it possible do read from a webflux flux in chunks? ( other than using delayElements )

For example after I write

Flux.range(1, 10).doOnNext(System.out::println).take(5).subscribe();

is there any way to continue to read the next 5 integers?

If not, is there an alternative for the consumer to decide when to request the next piece of data?

Edit:

To clarify, I would like to read the first 5 values, then pause until an arbitrary later time in the program, then read the next 5 values without recreating the emitter flux. simply calling buffer() won't do

2

There are 2 answers

1
jasiustasiu On

buffer is the keyword you're looking for

Flux.range(1, 10)
            .buffer(5)
            .doOnNext(System.out::println)
            .subscribe();

output:

[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#buffer--

0
Alexei Kaigorodov On

then you need a full-fledged asynchronous subscriber object, not just a chain of methods.

// use maven dependency 'org.df4j:df4j-core:8.3'
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;

public class FluxSubscriberTest {

    @Test
    public  void test10() {
        FluxSubscriber subscriber = new FluxSubscriber();
        Flux.range(1, 10).subscribe(subscriber.inp);
        subscriber.start();
        boolean ok = subscriber.blockingAwait(5000);
        Assert.assertTrue(ok);
    }

    static class FluxSubscriber extends Actor {
        InpFlow<Integer> inp = new InpFlow<>(this, 5);
        int count = 0;

        @Override
        protected void runAction() throws Throwable {
            if (inp.isCompleted()) {
                System.out.println("input stream completed");
                complete();
                return;
            }
            Integer value = inp.remove();
            System.out.println("value="+value);
            if (++count==5) {
                count = 0;
                System.out.println("pause:");
                delay(1000);
            }
        }
    }
}

In fact, it reads 5 items first, and then one by one after each call to inp.remove(). If this is not exactly what you want, then you can extend class InpFlow to modify the policy when it invokes Subscription.request().

Source codes are avalable at https://github.com/akaigoro/df4j (yes I am the author).