Swift Concurrency: Combine TaskGroup and AsyncStream?

214 views Asked by At

Context

Suppose I want to download 300 images in the background using Swift Concurrency. I'd like two things:

  1. As much parallelism as possible.
  2. My caller to receive each image as it's downloaded, rather than waiting for all of them to finish.

Approach:

TaskGroup has many advantages: great parallelism, very cheap child tasks, and cancellation. But it does not return until all 300 child tasks have finished.

AsyncStream lets me return images as they're downloaded, but has no parallelism on its own—downloads happen one at a time, in sequence.

Question:

What I'd like to do is wrap a TaskGroup with an AsyncStream, like this:

let stream = AsyncStream(NSImage.self) { continuation in
    
    _ = await withTaskGroup(of: NSImage.self, returning: [NSImage].self) { taskGroup in
        
        let imageURLs: [URL] = ... // array of 300 URLs to download
        for imageURL in imageURLs {
            taskGroup.addTask { await downloadImage(url: imageURL) }
        }

        for await result in taskGroup {
            continuation.yield(result)
        }

        continuation.finish()
        return []
    }
}

But AsyncStream can't take an async closure. So what's the best way to achieve this behavior with Swift Concurrency?

3

There are 3 answers

0
Rob On BEST ANSWER

The idea is that you would bridge from the synchronous context of AsyncStream to Swift concurrency by creating a Task for the asynchronous work. Also remember to add an onTermination closure so that it will respond to the cancelation of the AsyncStream:

func images(for urls: [URL]) -> AsyncStream<NSImage> {
    AsyncStream { continuation in
        let task = Task {
            await withTaskGroup(of: NSImage.self) { group in
                for url in urls {
                    group.addTask { await self.downloadImage(url: url) }
                }

                for await image in group {
                    continuation.yield(image)
                }

                continuation.finish()
            }
        }

        continuation.onTermination = { _ in
            task.cancel()
        }
    }
}

Obviously, since you are doing all of these network requests concurrently, you must recognize that these will likely not finish in the order corresponding to the original array of URLs.

So, you might return a tuple of the original URL and the resulting image:

func images(for urls: [URL]) -> AsyncStream<(URL, NSImage)> {
    AsyncStream { continuation in
        let task = Task {
            await withTaskGroup(of: (URL, NSImage).self) { group in
                for url in urls {
                    group.addTask { await (url, self.downloadImage(url: url)) }
                }

                for await tuple in group {
                    continuation.yield(tuple)
                }

                continuation.finish()
            }
        }

        continuation.onTermination = { _ in
            task.cancel()
        }
    }
}

Or perhaps an index number and the image:

func images(for urls: [URL]) -> AsyncStream<(Int, NSImage)> {
    AsyncStream { continuation in
        let task = Task {
            await withTaskGroup(of: (Int, NSImage).self) { group in
                for (index, url) in urls.enumerated() {
                    group.addTask { await (index, self.downloadImage(url: url)) }
                }

                for await tuple in group {
                    continuation.yield(tuple)
                }

                continuation.finish()
            }
        }

        continuation.onTermination = { _ in
            task.cancel()
        }
    }
}
0
workingdog support Ukraine On

I don't think your statement regarding TaskGroup: But it does not return until all 300 child tasks have finished is correct.

Here is an example code that downloads a number of images. It clearly shows that the view displays the images as they are downloaded, not when they are all downloaded.

You can use this approach to achieve your requirements.

class ViewModel: ObservableObject {
    @Published var images = [PhotoImg]()
    
    func loadImages() async  {
        let url1 = "https://sample-videos.com/img/Sample-png-image-100kb.png"
        let url2 = "https://sample-videos.com/img/Sample-png-image-200kb.png"
        let url3 = "https://sample-videos.com/img/Sample-png-image-500kb.png"
        let url4 = "https://sample-videos.com/img/Sample-png-image-1mb.png"
        
        let urls = [url1,url2,url3,url4]
        
        // get all images concurrently
        return await withTaskGroup(of: (Int, UIImage).self) { group -> Void in
            for i in 0..<urls.count {
                if let url = URL(string: urls[i]) {
                    group.addTask { await (i, self.loadImage(url: url)) }
                }
            }
            for await value in group {
                Task{ @MainActor in
                    print("-----> \(value.0) downloaded")
                    images.append(PhotoImg(key: value.0, img: value.1))
                }
            }
        }
    }
    
    private func loadImage(url: URL) async -> UIImage {
        do {
            let (data, _) = try await URLSession.shared.data(from: url)
            if let img = UIImage(data: data) { return img }
        }
        catch { print(error) }
        return UIImage()
    }
}

struct ContentView: View {
    @StateObject var vm = ViewModel()
    @State var isDone = false
    
    var body: some View {
        VStack {
            if isDone {
                Text("All images fetched")
            } else {
                ProgressView()
            }
            List(vm.images) { picture in
                Image(uiImage: picture.img)
                    .resizable()
                    .aspectRatio(contentMode: .fit)
                    .frame(width: 111, height: 111)
                    .cornerRadius(10)
            }
        }
        .task {
            await vm.loadImages()
            isDone = true
        }
    }
}

struct PhotoImg: Identifiable {
    let id = UUID()
    var key: Int
    var img: UIImage
}

struct APIResponse: Codable {
    let images: [String]
}

Note, you can easily convert this code to use the Observation framework if preferred.

0
Luca Angeletti On

Assuming you have this function

func downloadImage(url: URL) async -> NSImage {
    ...
}

You can update your code this way

let stream = AsyncStream(NSImage.self) { continuation in
    Task(priority: .background) {
        await withTaskGroup(of: NSImage.self) { taskGroup in
            
            let imageURLs: [URL] = ...
            for imageURL in imageURLs {
                taskGroup.addTask { await downloadImage(url: imageURL) }
            }

            for await result in taskGroup {
                continuation.yield(result)
            }
            
            continuation.finish()
        }
    }
}

And then use it this way

for await image in stream {
    print("Image received")
}

Of course consider the order the image appears in this last for loop is no guaranteed to be the same you follow when invoking downloadImage(url:).

Hope it helps.