I am trying to create a GetAndFetch method that would first return data from the cache, then fetch and return data from a webservice, and finally update the cache.
Such a function exists in akavache already, however, the data that is retrieved or stored by it is like a blob. i.e if I am interested in a rss feed I could only work at the level of whole feed and not individual items. I am interested in creating a version that returns the items as IObservable<Item>. This has the advantage that new Items can be displayed as soon they are returned by service and not wait for all the Itemss.
public IObservable<Item> GetAndFetch(IBlobCache cache, string feedUrl)
{
// The basic idea is to first get the cached objects
IObservable<HashSet<Item>> cacheBlobObject = cache.GetObject<HashSet<Item>>(feedUrl);
// Then call the service
IObservable<Item> fetchObs = service.GetItems(feedUrl);
// Consolidate the cache & the retrieved data and then update cache
IObservable<Item> updateObs = fetchObs
.ToArray()
.MyFilter() // filter out duplicates between retried data and cache
.SelectMany(arg =>
{
return cache.InsertObject(feedUrl, arg)
.SelectMany(__ => Observable.Empty<Item>());
});
// Then make sure cache retrieval, fetching and update is done in order
return cacheBlobObject.SelectMany(x => x.ToObservable())
.Concat(fetchObs)
.Concat(upadteObs);
}
The issue with my approach is that Concat(upadteObs) resubscribes to the fetchObs and ends up calling the service.GetItems(feedUrl) again which is wasteful.
You sound like you need the
.Publish(share => { ... })overload.Try this:
I'm concerned about the
Concatcalls - they might need to beMerge.Also, it seems like your call to
service.GetItemsis getting all items anyway - how is it avoiding the items already in the cache?An alternative implementation based on the comments: