How can I "force" some kind of backpressure to avoid multiple executions in rxjava?

551 views Asked by At

I have a piece of code who's work is to update a local cache. There are two triggers to this cache update:

  1. At a fixed interval
  2. When requested

So here's a basic example on how I did this.

forceReloadEvents = new SerializedSubject<Long, Long>(PublishSubject.<Long> create());
dataUpdates = Observable
    .merge(forceReloadEvents, Observable.timer(0, pullInterval, TimeUnit.SECONDS))
    .flatMap(new Func1<Long, Observable<Boolean>>() {
        @Override
        public Observable<Boolean> call(Long t) {
            return reloadData(); // operation that may take long
        }
    })
    .publish();

dataUpdates.subscribe();
dataUpdates.connect();

Then later i have

public void forceReload() {
    final CountDownLatch cdl = new CountDownLatch(1);

    dataUpdates
        .take(1)
        .subscribe(
            new Action1<Boolean>() {
                @Override
                public void call(Boolean b) {
                    cdl.countDown();
                }
            }
        );

    forceReloadEvents.onNext(-1L);

    try {
        cdl.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

This works but the problem is when I start to have multiple concurrent calls to forceReload() : There will be no concurrent execution of reloadData() but the elements will queue up and the process will loop on reloading data until all the events sent to forceReloadEvents have been consumed even though forceReload() already completed due to previous events releasing the CountDownLatch.

I wanted to use onBackPressureDrop but it seems there's no induced backpressure and nothing is dropped. What I'd like is some way to force backpressure so that the merge understands that only one element can be processed at a time and that any subsequent event must be dropped until the current execution is done.

I thought about using buffer or throttleFirst too but I don't want to force a specific time between each event and i'd rather have this auto-scaling depending on the time it takes to reload the cache. You can think of it as throttleFirst until reloadData has completed.

2

There are 2 answers

6
akarnokd On

Edit: based on the comments, you can have an AtomicBoolean as a gate in the flatMap to not start a reload until the gate is open again:

public class AvoidReloadStorm {
    static Observable<Boolean> reload() {
        return Observable.just(true)
        .doOnNext(v -> System.out.println("Reload started..."))
        .delay(10, TimeUnit.SECONDS)
        .doOnNext(v -> System.out.println("Reloaded"));
    }
    public static void main(String[] args) throws Exception {
        Subject<Long, Long> manual = PublishSubject.<Long>create().toSerialized();
        Observable<Long> timer = Observable.timer(0, 5, TimeUnit.SECONDS)
                .doOnNext(v -> System.out.println("Timer reload"));

        AtomicBoolean running = new AtomicBoolean();

        ConnectableObservable<Boolean> src = Observable
        .merge(manual.onBackpressureDrop(), timer.onBackpressureDrop())
        .observeOn(Schedulers.io())
        .flatMap(v -> {
            if (running.compareAndSet(false, true)) {
                return reload().doOnCompleted(() -> {
                    running.set(false);
                });
            }
            System.out.println("Reload rejected");
            return Observable.empty();
        }).publish();

        src.subscribe(System.out::println);

        src.connect();

        Thread.sleep(100000);
    }
}
0
Crystark On

I made this work thanks to akarnokd!

Here's the solution I created based on his answer:

Observable<Long> forceReloadEvents = this.forceReloadEvents
    .asObservable()
    .onBackpressureDrop();

Observable<Long> periodicReload = Observable
    .timer(0, pullInterval, TimeUnit.SECONDS)
    .onBackpressureDrop();

final AtomicBoolean running = new AtomicBoolean();

dataUpdates = Observable
    .merge(forceReloadEvents, periodicReload)
    .filter(new Func1<Long, Boolean>() {
        @Override
        public Boolean call(Long t) {
            return running.compareAndSet(false, true);
        }
    })
    .observeOn(Schedulers.io())
    .flatMap(new Func1<Long, Observable<Boolean>>() {
        @Override
        public Observable<Boolean> call(Long t) {
            return reloadData();
        }
    })
    .doOnNext(new Action1<Boolean>() {
        @Override
        public void call(Boolean t) {
            running.set(false);
        }
    })
    .publish();

dataUpdates.subscribe();
dataUpdates.connect();

I'm not sure onBackpressureDrop is usefull here but I set it as a precaution.

The forceReload code does not change.