Requerying and updating subscribers with RxJava2

1k views Asked by At

I have a data layer in my app that is backed by a retrofit service. (So far there is only persistence over the network. When I get further in development I will add offline-first local storage)

Retrofit returns an Observable<List<Item>> to me when I make a call to the server. This works well. In my subscriber, I receive the list when I subscribe and can then populate my UI with the Items.

The issue I am having is: If the list is modified (by some external mechanism) how can I make the observable re-query the retrofit service and emit a new list of items. I would be aware that the data is stale, but I am not sure how to initiate a re-query.

Here is a pared down version of my DataManager

class DataManager {

    // Retrofit
    RetrofitItemsService itemsService;

    // The observalble provided by retrofit
    Observable<List<Item>> itemsObservable;

    //ctor
    public DataManager(RetrofitItemsService itemsService) {
        this.itemsService = itemsService;
    }

    /* Creates and stores an observable if one has not been created yet.
     * Returns the observable so that it can be subscribed to by the function caller
     */
    public Observable<List<Item>> getItems(){
        if(itemsObservable == null){
            itemsObservable = itemsService.getItems();
        }

        return itemsObservable;
    }

    /* Adds a new Item to the list.
     */
    public Completable addItem(Item item){
        Completable call = itemsService.addItem(item);

        call.subscribe(()->{
            /*
             < < < Here > > >
             If someone has previously called getItems before this item was added, they now have stale data.

             How can I call something like:

             itemsObservable.refreshAllSubscribers()
            */
        });

        return call;
    }
}
2

There are 2 answers

0
koperko On BEST ANSWER

The problem you are fighting here lies in the difference between hot and cold observable. There are many great articles you can google out which describe the differences in detail, so let me just describe the basics.

Cold observable creates a new producer for every subscriber. That means, when two separate subscribers subscribe to the same cold observable they each receive different instances of these emissions. They might(!) be equal but they are never-the-less different objects. Applied to your case here, each subscriber gets its own producer which requests the server for data and passes it into the stream. Every subscriber is served with data from its own producer.

Hot observable shares a producer with all its observers. If the producer is for example iterating through a collection of objects, jumping in with a second subscriber in the middle of emissions means that it will get only the items emitted afterwards ( if its not modified through operators like replay ). Every object that received by any subscriber is also the same instance across all observers as it comes from a single producer.

So by the looks of it, you need to have a hot observable to distribute your data, so that when you know its no longer valid you just emit it once through this hot observable and every observer will be happy with an update.

Luckily, turning a cold observable into a hot is usually not a big deal. You can either create your own producer that mimics this behavior, use one of the popular operators like share or you can just transform the stream so that it behaves like one.

I would advise using PublishSubject for refreshing the data and merging it with the original cold observable like this:

class DataManager {

    .....

    PublishSubject<Boolean> refreshSubject = PublishSubject.create();

    // The observable for retrieving always fresh data
    Observable<List<Item>> itemsObservable;

    //ctor
    public DataManager(RetrofitItemsService itemsService) {
        this.itemsService = itemsService;
        itemsObservable = itemsService.getItems()
                              .mergeWith(refreshSubject.flatMap(refresh -> itemsService.getItems()))
    }


    public Observable<List<Item>> getItems(){
        return itemsObservable;
    }

    /* Adds a new Item to the list.
     */
    public Completable addItem(Item item){
        Completable call = itemsService.addItem(item);

        call.subscribe(()->{
            refreshSubject.onNext(true);
        });

        return call;
    }
}
0
akarnokd On

I guess the itemsService.getItems() returns a single element Observable thus consumers have to resubscribe to get a fresh data anyway and they will get it as Retrofit Observables are deferred/lazy as well.

You could have a separate, "long" Observable, via the help of PublishSubject that you can trigger when the data changes:

final Subject<Object> onItemsChanged = PublishSubject.create().toSerialized();

public Observable<Object> itemsChanged() {
    return onItemsChanged;
}

public Completable addItem(Item item){
    Completable call = itemsService.addItem(item);

    // prevent triggering the addItem multiple times
    // Needs RxJava 2 Extensions library for now
    // as there is no Completable.cache() or equivalent in 2.0.3
    CompletableSubject cs = CompletableSubject.create();

    call.doOnComplete(() -> onItemsChanged.onNext("changed"))
    .subscribe(cs);

    return cs;
}