I have a piece of code who's work is to update a local cache. There are two triggers to this cache update:
- At a fixed interval
- When requested
So here's a basic example on how I did this.
forceReloadEvents = new SerializedSubject<Long, Long>(PublishSubject.<Long> create());
dataUpdates = Observable
.merge(forceReloadEvents, Observable.timer(0, pullInterval, TimeUnit.SECONDS))
.flatMap(new Func1<Long, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Long t) {
return reloadData(); // operation that may take long
}
})
.publish();
dataUpdates.subscribe();
dataUpdates.connect();
Then later i have
public void forceReload() {
final CountDownLatch cdl = new CountDownLatch(1);
dataUpdates
.take(1)
.subscribe(
new Action1<Boolean>() {
@Override
public void call(Boolean b) {
cdl.countDown();
}
}
);
forceReloadEvents.onNext(-1L);
try {
cdl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
This works but the problem is when I start to have multiple concurrent calls to forceReload()
: There will be no concurrent execution of reloadData()
but the elements will queue up and the process will loop on reloading data until all the events sent to forceReloadEvents
have been consumed even though forceReload()
already completed due to previous events releasing the CountDownLatch
.
I wanted to use onBackPressureDrop
but it seems there's no induced backpressure and nothing is dropped. What I'd like is some way to force backpressure so that the merge understands that only one element can be processed at a time and that any subsequent event must be dropped until the current execution is done.
I thought about using buffer
or throttleFirst
too but I don't want to force a specific time between each event and i'd rather have this auto-scaling depending on the time it takes to reload the cache. You can think of it as throttleFirst
until reloadData
has completed.
Edit: based on the comments, you can have an AtomicBoolean as a gate in the flatMap to not start a reload until the gate is open again: