I am on Android and I created an Observable
of touch downs. And I would like to buffer them as long as new events are coming within a second. When a second elapsed from the last touch down emission, I want to create a list of all collected events and emit it, and start a new collection.
I have 2 snippets of code which work but they either don't seem too reactive (#1) or unnecessarily complex (#2). Here they are:
- I use the
buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector)
overload - when the returned selector emits an item, it means a second elapsed from the last source emission and the buffer can be emitted. - The
Obserbable
the selector function returns is aPublishSubject
so that I can decide when to push emissions to it. - There exists a
Runnable
task which pushes new emissions to the closing subject. This task is scheduled (via an Android Handler) to run a second after the currently processed source touch down emission happened. When a new source emission of a new touch down happens afterwards within a second, the task is cancelled, and a new one is scheduled again after a second.
Here is the relevant Android code:
final PublishSubject<MotionEvent> touchPublishSubject = PublishSubject.create();
final ViewGroup viewGroup = (ViewGroup) findViewById(android.R.id.content);
viewGroup.setOnTouchListener(new View.OnTouchListener() {
@Override
public boolean onTouch(View v, MotionEvent event) {
touchPublishSubject.onNext(event);
return true;
}
});
final PublishSubject<Object> windowClosePublishSubject = PublishSubject.create();
final Handler handler = new Handler(Looper.getMainLooper());
final Runnable r = new Runnable() {
@Override
public void run() {
Log.d(TAG, "will emit closing item");
windowClosePublishSubject.onNext("now!");
}
};
ViewObservable
.bindView(viewGroup, touchPublishSubject)
.filter(new Func1<MotionEvent, Boolean>() {
@Override
public Boolean call(MotionEvent motionEvent) {
return motionEvent.getAction() == MotionEvent.ACTION_DOWN;
}
})
.doOnNext(new Action1<MotionEvent>() {
@Override
public void call(MotionEvent motionEvent) {
// restart the timer
Log.d(TAG, "cancelling closing");
handler.removeCallbacks(r);
Log.d(TAG, "scheduling closing");
handler.postDelayed(r, 1000L);
// show the touch
Log.d(TAG, motionEvent.toString());
}
})
.buffer(new Func0<Observable<?>>() {
@Override
public Observable<?> call() {
Log.d(TAG, "creating buffer closing selector");
return windowClosePublishSubject
.doOnNext(new Action1<Object>() {
@Override
public void call(Object o) {
Log.d(TAG, "emitting closing item '" + o + "'");
}
});
}
})
.subscribe(new Action1<List<MotionEvent>>() {
@Override
public void call(List<MotionEvent> motionEvents) {
// show number of touch downs
Log.d(TAG, "got " + motionEvents.size() + " touch downs");
}
});
I don't fancy the usage of Handler
and all that in this solution, so I looked further.
The second snippet (the touchPublishSubject and the touch listener are exactly the same):
- I reuse the touch down generating
touchPublishSubject
as the window closing observable, debouncing it first with a 1 sec timeout - Apparently, as the debouncing happens on
Scheduler.computation()
, it moves observing to the same scheduler and I need to useobserveOn(AndroidSchedulers.mainThread())
- I find it a bit strange that the nestedObservable
's scheduler, which only closes the buffer windows, promotes the whole chain to happen in its scheduler as well
The code:
final PublishSubject<MotionEvent> touchPublishSubject = PublishSubject.create();
final ViewGroup viewGroup = (ViewGroup) findViewById(android.R.id.content);
viewGroup.setOnTouchListener(new View.OnTouchListener() {
@Override
public boolean onTouch(View v, MotionEvent event) {
touchPublishSubject.onNext(event);
return true;
}
});
ViewObservable
.bindView(viewGroup, touchPublishSubject)
.filter(new Func1<MotionEvent, Boolean>() {
@Override
public Boolean call(MotionEvent motionEvent) {
return motionEvent.getAction() == MotionEvent.ACTION_DOWN;
}
})
.doOnNext(new Action1<MotionEvent>() {
@Override
public void call(MotionEvent motionEvent) {
// show the touch
Log.d(TAG, motionEvent.toString());
}
})
.buffer(new Func0<Observable<?>>() {
@Override
public Observable<?> call() {
Log.d(TAG, "creating buffer closing selector");
return touchPublishSubject
.debounce(1L, TimeUnit.SECONDS)
.doOnNext(new Action1<Object>() {
@Override
public void call(Object o) {
Log.d(TAG, "emitting closing item '" + o + "'");
}
});
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<MotionEvent>>() {
@Override
public void call(List<MotionEvent> motionEvents) {
// show number of touch downs
Log.d(TAG, "got " + motionEvents.size() + " touch downs");
}
});
This code works and I like it better than the first one, feels more like it should be done with Rx. But it is complex because of the nested Observable
and the brain gymnastics necessary to get it. Is there a buffer
overload I'm missing to automatically do the same (i.e. close its window after the last emission is 1 sec old)?
Edit: one of the comments made me aware of a presentation by Ben Christensen, and then I found this: https://blog.kaush.co/2015/01/05/debouncedbuffer-with-rxjava/, which links to a few implementation of the problem. Seems like a pretty common requirement, would be nice to have a built-in operator for this. Anyways, I would consider the solution presented in these other sources and here to be canon for this type of problems.