How to implement a queue of serial network calls then processing in RxSwift?

3.4k views Asked by At

I'm working on an app where I want to achieve the following using RxSwift and RxCocoa

  1. Download JSON containing url to X number of files
  2. Download file 1, process file 1
  3. Download file 2, process file 2
  4. Download file 3, process file 3

... etc

The key here is that the processing of each file has to complete before downloading the next file. At the very least the order of the file processing must be performed in order. If I can start downloading file 2 while file 1 is processing, that would be awesome, but is not necessary.

I've tried using SerialDispatchQueueScheduler to make this work, but I since the files are of different sizes, the download of each file finishes at different times, and therefore the processing code fires in a different order than I started the downloads.

I could easily implement this without using Rx by using NSOperations and the like, but I'd like to keep using Rx in this app, as it's what I use elsewhere in this app.

Below I've included a snippet with some of the code. Comments have been added for the sake of this question.

       .flatMap { [unowned self] (tasks: [DiffTask]) -> Observable<ApplyDiffStatus> in
            return Observable.from(tasks)
                .observeOn(self.backgroundScheduler) // StackOverflow: backgroundScheduler is a SerialDispatchQueueScheduler
                .flatMapWithIndex({ [unowned self] (task, index) in
                    return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count) // StackOverflow: Downloads a file from a URL
                })
                .catchError({ (error) -> Observable<DictionaryUpdater.DiffTaskProgress> in
                    observable.onError(error)
                    throw error
                })
                .map({ (diffTask : DiffTaskProgress) -> DiffTaskProgress.Progress in
                    // Stack Overflow: I've wrapped much of the progress observable in a Observable<UpdateProgress>
                    switch diffTask.progress {
                    case .started(currentTask: let currentTask, taskCount: let taskCount):
                        observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
                    case .finished(data: _, currentTask: let currentTask, taskCount: let taskCount):
                        observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
                    case .progress(completion: _, currentTask: let currentTask, taskCount: let taskCount):
                        observable.on(.next(.fetchingDiff(progress: diffTask, currentDiff: currentTask, diffCount: taskCount)))
                    }

                    return diffTask.progress
                })
                .flatMap({ [unowned self] (progress: DiffTaskProgress.Progress) -> Observable<ApplyDiffStatus> in
                    switch progress {
                    case .finished(data: let data, currentTask: let currentTask, taskCount: let taskCount):
                        return self.applyDiff(data, currentTask: currentTask, taskCount: taskCount) // StackOverflow: PROCESSES THE FILE THAT WAS DOWNLOADED
                    default:
                        return Observable.empty()
                    }
                })
        }
1

There are 1 answers

0
Nailer On BEST ANSWER

I managed to solve it by using the concatMap operator. So instead of

.flatMapWithIndex({ [unowned self] (task, index) in
    return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count) // StackOverflow: Downloads a file from a URL
})

I did something like this:

tasks.enumerated().concatMap { (index, task) in
    return self.fetchDiff(for: task, taskIndex: index, taskCount: tasks.count)
}

The concatMap operator makes sure that the first observable is finished before emitting any more signals. I had to use enumerated() since concatMap does not come with a concatMapWithIndex, but it works :)