How should I update multiple RavenDB documents from a daily feed?

1.1k views Asked by At

I'm using RavenDB to hold several thousand documents. The data comes from a daily xml feed which I'll update by running a C# console app. Below is the code that processes the feed to keep the database in sync with any changes. I've had quite a few problems with this so I'm wondering if I've picked the wrong strategy. Here are some important things to note.

  1. New items may have been added to the feed and existing items may have changed, so each time it runs I want to either add or update a document depending on whether or not it's new.
  2. The xml feed doesn't contain any reference to my RavenDB IDs, only its internal key for each item. So when retrieving an existing document to update I can only do that by examining the "SourceID" property on the document.
  3. I'm using "take" to only work with 500 docs at a time partly because my db is limited to 1000 docs, and partly because without Take() I seem to be only able to retrieve 128 docs.
  4. As it stands, this code falls over with a "can't do more than 30 updates in a session" error, I think because each time I try to retrieve an existing record from dbItems it actually hits the database again.
  5. I can fix the issue at (4) above by calling ToList() on items, but if I do that the existing item doesn't get updated when I call session.SaveChanges() (I'm imagining this like a disconnected recordset).

Can anyone give me some pointers?

        public void ProcessFeed(string rawXml)
        {
            XDocument doc = XDocument.Parse(rawXml);
            var items = ExtractItemsFromFeed(doc).OrderBy(x => x.SourceId).Take(500);
            using (IDocumentSession session = _store.OpenSession())
            {
                var dbItems = session.Query<AccItem>().OrderBy(x => x.SourceId).Take(500);
                foreach (var item in items)
                {
                    var existingRecord = dbItems.SingleOrDefault(x => x.SourceId == item.SourceId);
                    if (existingRecord == null)
                    {
                        session.Store(item);
                        _logger.Info("Saved new item {0}.", item.ShortName);
                    }
                    else
                    {
                        // update just one field for now
                        existingRecord.Village = item.Village;
                        _logger.Info("Updated item {0}.", item.ShortName);
                    }
                }
                session.SaveChanges();
            }            
        }
1

There are 1 answers

0
centralscru On BEST ANSWER

Below is the code I ended up with for this. I think the initial problem with the original version was simply that I was trying to use the same session for every item, breaking the 30 limit.

Tipped off by some code on screen in a TekPub screencast I fixed this by batching the whole process into sets of 15 (to allow for one read and one write, so 30 requests in total per batch). This is pretty slow, but not nearly slow as I'd expected. I'm expecting maybe 10,000 records at a time so I'll just leave it ticking away until it's done.

public void ProcessFeed(string rawXml)
{
    XDocument doc = XDocument.Parse(rawXml);
    var items = ExtractItemsFromFeed(doc).OrderBy(x => x.SourceId);
    int numberOfItems = items.Count;
    int batchSize = 15;
    int numberOfBatchesRequired = numberOfItems / batchSize;
    int numberOfBatchesProcessed = 0;
    int numberOfItemsInLastBatch = numberOfItems - (numberOfBatchesRequired * batchSize); 
    for (var i = 0;i <= numberOfBatchesRequired;i++)
    {
        using (IDocumentSession session = _store.OpenSession())
        {
            var numberOfItemsProcessedSoFar = numberOfBatchesProcessed * batchSize;
            var numberOfItemsRemaining = numberOfItems - numberOfItemsProcessedSoFar;
            int itemsToTake = 15;
            if (numberOfItemsRemaining > 0 && numberOfItemsRemaining < 15)
            itemsToTake = numberOfItemsRemaining;
            foreach (var item in items.Skip(numberOfItemsProcessedSoFar).Take(itemsToTake))
            {
            var existingRecords = session.Query<AccItem>().Where(x => x.SourceId == item.SourceId).ToList();
            if (!existingRecords.Any())
            {
                session.Store(item);
                _logger.Info("Saved new item {0}.", item.ShortName);
            }
            else
            {
                if (existingRecords.Count() > 1)
                _logger.Warn("There's more than one item in the database with the sourceid {0}", item.SourceId);
                existingRecords.First().Village = item.Village;
                _logger.Info("Updated item {0}.", item.ShortName);
            }
            session.SaveChanges();
            }
        }            
        numberOfBatchesProcessed++;
    }
}