I'm slowly moving my application from RxJava 1 to RxJava 2. Everything works fine after having updated all the code, except for one use case and I'm kind of pretty lost at the moment, I guess I need to go back to the document to get it properly.
The application loads a collection of Asset
from the network and it this operation takes from than x ms, it displays a loading animation. Then when the data is retrieved, animation is stopped/removed and data displayed.
This what I had with RxJava 1 and which was working:
getAssetsSubscription = new GetAssetsUseCase().execute()
.publish(new Func1<Observable<List<Asset>>, Observable<List<Asset>>>() {
@Override
public Observable<List<Asset>> call(Observable<List<Asset>> o) {
return o.timeout(LOADING_VIEW_THRESHOLD_MS, TimeUnit.MILLISECONDS,
Observable.fromCallable(new Callable<List<Asset>>() {
@Override
public List<Asset> call() throws Exception {
if (isAdded()) {
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
setLoadingViewVisibility(true);
}
});
}
return null;
}
}
)
).ignoreElements().mergeWith(o);
}
})
.subscribe(new Subscriber<List<Asset>>() {
@Override
public void onCompleted() {
// Do things...
}
@Override
public void onError(Throwable e) {
// Do things...
}
@Override
public void onNext(List<Asset> assets) {
// Do things...
}
});
And this is my "translation" to RxJava 2: with this, the data is never displayed, onComplete
is always called, but onNext
never. This is also the case when the timeout is not triggered.
disposables.add(new GetAssetsUseCase().execute().publish(new Function<Observable<List<Asset>>,
ObservableSource<List<Asset>>>() {
@Override
public ObservableSource<List<Asset>> apply(Observable<List<Asset>> listObservable) throws
Exception {
return listObservable.timeout(LOADING_VIEW_THRESHOLD_MS, TimeUnit.MILLISECONDS,
Observable.fromCallable(new Callable<List<Asset>>() {
@Override
public List<Asset> call() throws Exception {
if (isAdded()) {
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
setLoadingViewVisibility(true);
}
});
}
return null;
}
})
).ignoreElements().mergeWith(Completable.fromObservable(listObservable)).toObservable();
}
})
.subscribeWith(new DisposableObserver<List<Asset>>() {
@Override
public void onComplete() {
// Do things...
}
@Override
public void onError(Throwable e) {
// Do things...
}
@Override
public void onNext(List<Asset> assets) {
// Do things...
}
}));
The original code used
Observable#mergeWith(Observable)
. Since RxJava2 narrows types where appropriate this changed in your revised code toCompletable.mergeWith(Completable)
.To get the same behavior as the old code you need to change the order of operations:
.ignoreElements().mergeWith(Completable.fromObservable(listObservable)).toObservable()
.ignoreElements().<List<Asset>>toObservable().mergeWith(listObservable)
since
Completable.fromObservable(...)
is basically equivalent toObservable#ignoreElements()
.In addition the
return null;
will probably cause problems with RxJava2 since the contract specifies there can be nonull
values in the event stream. Consider replacing theObservable.fromCallable(...)
withCompletable.fromRunnable(...).toObservable()