I am pretty new to RxJava and I need to create repository with several datasources. It is complex to me because there are several smaller subtasks which I don't know how to implement with RxJava.

I have Dao which provides Flowable<Item> in some range to DataSource class. This data source has local cache which can be invalidated at any time. When repository asks DataSource for some range (which can be out of DataSourse bounds, bounds are unknown until fully cached) it must produce an error (or notify Repository in other way).

I want to create Flowable<Item> method for DataSource which will emit items from cache, and if needed concatenate them with Flowable<Item> dao.getRange(...), meanwhile caching new items, which come from dao. Also I need to do stuff with errors coming from dao, they must be handled or converted to higher level errors.

DataSource.class:

List<Item> cache;

Flowable<Item> getRange(int start, int amount) {

    final int cacheSize = cache.size();
    final int canLoadFromCache = cacheSize - start;
    final int loadFromDao = amount - canLoadFromCache;

    if (isCorrupted) return Flowable.fromCallable(() -> {
        throw new Exception("CorruptedDatasource");
    });

    Flowable<Item> cacheFlow = null;
    Flowable<Item> daoFlow = null;

    if (canLoadFromCache > 0) {
        cacheFlow = Flowable.fromIterable(
                cache.subList(start, canLoadFromCache)
        );

        daoFlow = dao.getRange(
                uri, 
                cacheSize, //start
                loadFromDao //amount
        );
    } else {
        if (isFullyCached) return Flowable.fromCallable(() -> {
            throw new Exception("OutOfBounds");
        });

        //To not deal with gaps load and cache data between;
        //Or replace it with data structure which can handle for us;
        daoFlow = dao.getRange(
                uri,
                cacheSize,
                start - cacheSize + amount);
        //all these items should be cached;
        //other cached and put downstream;
        //Dao errs should be converted to higher lever exceptions,
        //Or set flags in DataSource;
    }
    // return concatenated flowable;
}

At the higher level repository concatenates data coming from several DataSources, so there must be a way to cancatinate ranges coming from several sources in a way if one haven't enought then range from next one should be added.

Please help me!

1 Answers

0
Sanlok Lee On Best Solutions

Try concat or concatEager which concatenates two observables. Also doOnNext() or doOnError() can help you caching and error handlings

List<Item> cache;

Flowable<Item> getRange(int start, int amount) {

    ...
        if (isFullyCached) return Flowable.fromCallable(() -> {
            throw new Exception("OutOfBounds");
        });

        //To not deal with gaps load and cache data between;
        //Or replace it with data structure which can handle for us;
        daoFlow = dao.getRange(
                uri,
                cacheSize,
                start - cacheSize + amount);
        //all these items should be cached;
        //other cached and put downstream;
            .doOnNext(result -> /* insert caching logic here */)
        //Dao errs should be converted to higher lever exceptions,
        //Or set flags in DataSource;
            .doOnError(error -> /* handle error here */)
            .onErrorReturn(/* and/or return some empty item */)
    }
    // return concatenated flowable;
    return cacheFlow.concat(daoFlow);
}