Buffer events, start new group 1 second after the last event from previous group

230 views Asked by At

I am on Android and I created an Observable of touch downs. And I would like to buffer them as long as new events are coming within a second. When a second elapsed from the last touch down emission, I want to create a list of all collected events and emit it, and start a new collection.

I have 2 snippets of code which work but they either don't seem too reactive (#1) or unnecessarily complex (#2). Here they are:

  1. I use the buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector) overload - when the returned selector emits an item, it means a second elapsed from the last source emission and the buffer can be emitted.
  2. The Obserbable the selector function returns is a PublishSubject so that I can decide when to push emissions to it.
  3. There exists a Runnable task which pushes new emissions to the closing subject. This task is scheduled (via an Android Handler) to run a second after the currently processed source touch down emission happened. When a new source emission of a new touch down happens afterwards within a second, the task is cancelled, and a new one is scheduled again after a second.

Here is the relevant Android code:

    final PublishSubject<MotionEvent> touchPublishSubject = PublishSubject.create();

    final ViewGroup viewGroup = (ViewGroup) findViewById(android.R.id.content);
    viewGroup.setOnTouchListener(new View.OnTouchListener() {

        @Override
        public boolean onTouch(View v, MotionEvent event) {
            touchPublishSubject.onNext(event);

            return true;
        }
    });

    final PublishSubject<Object> windowClosePublishSubject = PublishSubject.create();
    final Handler handler = new Handler(Looper.getMainLooper());
    final Runnable r = new Runnable() {

        @Override
        public void run() {
            Log.d(TAG, "will emit closing item");
            windowClosePublishSubject.onNext("now!");
        }
    };

    ViewObservable
            .bindView(viewGroup, touchPublishSubject)
            .filter(new Func1<MotionEvent, Boolean>() {

                @Override
                public Boolean call(MotionEvent motionEvent) {
                    return motionEvent.getAction() == MotionEvent.ACTION_DOWN;
                }
            })
            .doOnNext(new Action1<MotionEvent>() {

                @Override
                public void call(MotionEvent motionEvent) {
                    // restart the timer
                    Log.d(TAG, "cancelling closing");
                    handler.removeCallbacks(r);
                    Log.d(TAG, "scheduling closing");
                    handler.postDelayed(r, 1000L);

                    // show the touch
                    Log.d(TAG, motionEvent.toString());
                }
            })
            .buffer(new Func0<Observable<?>>() {

                @Override
                public Observable<?> call() {
                    Log.d(TAG, "creating buffer closing selector");
                    return windowClosePublishSubject
                            .doOnNext(new Action1<Object>() {

                                @Override
                                public void call(Object o) {
                                    Log.d(TAG, "emitting closing item '" + o + "'");
                                }
                            });
                }
            })
            .subscribe(new Action1<List<MotionEvent>>() {

                @Override
                public void call(List<MotionEvent> motionEvents) {
                    // show number of touch downs
                    Log.d(TAG, "got " + motionEvents.size() + " touch downs");
                }
            });

I don't fancy the usage of Handler and all that in this solution, so I looked further.

The second snippet (the touchPublishSubject and the touch listener are exactly the same):

  1. I reuse the touch down generating touchPublishSubject as the window closing observable, debouncing it first with a 1 sec timeout
  2. Apparently, as the debouncing happens on Scheduler.computation(), it moves observing to the same scheduler and I need to use observeOn(AndroidSchedulers.mainThread()) - I find it a bit strange that the nested Observable's scheduler, which only closes the buffer windows, promotes the whole chain to happen in its scheduler as well

The code:

    final PublishSubject<MotionEvent> touchPublishSubject = PublishSubject.create();

    final ViewGroup viewGroup = (ViewGroup) findViewById(android.R.id.content);
    viewGroup.setOnTouchListener(new View.OnTouchListener() {

        @Override
        public boolean onTouch(View v, MotionEvent event) {
            touchPublishSubject.onNext(event);

            return true;
        }
    });

    ViewObservable
            .bindView(viewGroup, touchPublishSubject)
            .filter(new Func1<MotionEvent, Boolean>() {

                @Override
                public Boolean call(MotionEvent motionEvent) {
                    return motionEvent.getAction() == MotionEvent.ACTION_DOWN;
                }
            })
            .doOnNext(new Action1<MotionEvent>() {

                @Override
                public void call(MotionEvent motionEvent) {
                    // show the touch
                    Log.d(TAG, motionEvent.toString());
                }
            })
            .buffer(new Func0<Observable<?>>() {

                @Override
                public Observable<?> call() {
                    Log.d(TAG, "creating buffer closing selector");
                    return touchPublishSubject
                            .debounce(1L, TimeUnit.SECONDS)
                            .doOnNext(new Action1<Object>() {

                                @Override
                                public void call(Object o) {
                                    Log.d(TAG, "emitting closing item '" + o + "'");
                                }
                            });
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<List<MotionEvent>>() {

                @Override
                public void call(List<MotionEvent> motionEvents) {
                    // show number of touch downs
                    Log.d(TAG, "got " + motionEvents.size() + " touch downs");
                }
            });

This code works and I like it better than the first one, feels more like it should be done with Rx. But it is complex because of the nested Observable and the brain gymnastics necessary to get it. Is there a buffer overload I'm missing to automatically do the same (i.e. close its window after the last emission is 1 sec old)?

Edit: one of the comments made me aware of a presentation by Ben Christensen, and then I found this: https://blog.kaush.co/2015/01/05/debouncedbuffer-with-rxjava/, which links to a few implementation of the problem. Seems like a pretty common requirement, would be nice to have a built-in operator for this. Anyways, I would consider the solution presented in these other sources and here to be canon for this type of problems.

0

There are 0 answers