Observable timeout logic: migration to RxJava2

563 views Asked by At

I'm slowly moving my application from RxJava 1 to RxJava 2. Everything works fine after having updated all the code, except for one use case and I'm kind of pretty lost at the moment, I guess I need to go back to the document to get it properly.

The application loads a collection of Asset from the network and it this operation takes from than x ms, it displays a loading animation. Then when the data is retrieved, animation is stopped/removed and data displayed.

This what I had with RxJava 1 and which was working:

getAssetsSubscription = new GetAssetsUseCase().execute()
                    .publish(new Func1<Observable<List<Asset>>, Observable<List<Asset>>>() {
                        @Override
                        public Observable<List<Asset>> call(Observable<List<Asset>> o) {
                            return o.timeout(LOADING_VIEW_THRESHOLD_MS, TimeUnit.MILLISECONDS,
                                    Observable.fromCallable(new Callable<List<Asset>>() {
                                         @Override
                                         public List<Asset> call() throws Exception {
                                             if (isAdded()) {
                                                 getActivity().runOnUiThread(new Runnable() {
                                                     @Override
                                                     public void run() {
                                                         setLoadingViewVisibility(true);
                                                     }
                                                 });
                                             }
                                             return null;
                                         }
                                     }
                                )
                            ).ignoreElements().mergeWith(o);
                        }
                    })
                    .subscribe(new Subscriber<List<Asset>>() {
                @Override
                public void onCompleted() {
                    // Do things...
                }

                @Override
                public void onError(Throwable e) {
                    // Do things...
                }

                @Override
                public void onNext(List<Asset> assets) {
                    // Do things...
                }
            });

And this is my "translation" to RxJava 2: with this, the data is never displayed, onComplete is always called, but onNext never. This is also the case when the timeout is not triggered.

disposables.add(new GetAssetsUseCase().execute().publish(new Function<Observable<List<Asset>>,
                    ObservableSource<List<Asset>>>() {
                @Override
                public ObservableSource<List<Asset>> apply(Observable<List<Asset>> listObservable) throws
                        Exception {
                    return listObservable.timeout(LOADING_VIEW_THRESHOLD_MS, TimeUnit.MILLISECONDS,
                            Observable.fromCallable(new Callable<List<Asset>>() {
                                @Override
                                public List<Asset> call() throws Exception {
                                    if (isAdded()) {
                                        getActivity().runOnUiThread(new Runnable() {
                                            @Override
                                            public void run() {
                                                setLoadingViewVisibility(true);
                                            }
                                        });
                                    }
                                    return null;
                                }
                            })
                    ).ignoreElements().mergeWith(Completable.fromObservable(listObservable)).toObservable();
                }
            })
            .subscribeWith(new DisposableObserver<List<Asset>>() {
                @Override
                public void onComplete() {
                    // Do things...
                }

                @Override
                public void onError(Throwable e) {
                    // Do things...
                }

                @Override
                public void onNext(List<Asset> assets) {
                    // Do things...
                }
            }));
2

There are 2 answers

1
Kiskae On BEST ANSWER

The original code used Observable#mergeWith(Observable). Since RxJava2 narrows types where appropriate this changed in your revised code to Completable.mergeWith(Completable).

To get the same behavior as the old code you need to change the order of operations:

  • from .ignoreElements().mergeWith(Completable.fromObservable(listObservable)).toObservable()
  • to .ignoreElements().<List<Asset>>toObservable().mergeWith(listObservable)

since Completable.fromObservable(...) is basically equivalent to Observable#ignoreElements().

In addition the return null; will probably cause problems with RxJava2 since the contract specifies there can be no null values in the event stream. Consider replacing the Observable.fromCallable(...) with Completable.fromRunnable(...).toObservable()

1
EpicPandaForce On

That's because your callable returns null, which means it is a terminal event.

@Override
public List<Asset> call() throws Exception {
    if(isAdded()) {
        getActivity().runOnUiThread(new Runnable() { 
            @Override
            public void run() {
                setLoadingViewVisibility(true);
            }
        });
    }
    return null;
}

Should be

@Override
public List<Asset> call() throws Exception {
    if(isAdded()) {
        getActivity().runOnUiThread(new Runnable() { // TODO: move after `observeOn(AndroidSchedulers.mainThread())`
            @Override
            public void run() {
                setLoadingViewVisibility(true);
            }
        });
    }
    return Collections.emptyList();
}