TaskGroup limit amount of memory usage for lots of tasks

1.2k views Asked by At

I'm trying to build a chunked file uploading mechanism using modern Swift Concurrency. There is a streamed file reader which I'm using to read files chunk by chunk of 1mb size. It has two closures nextChunk: (DataChunk) -> Void and completion: () - Void. The first one gets called as many times as there is data read from InputStream of a chunk size.

In order to make this reader compliant to Swift Concurrency I made the extension and created AsyncStream which seems to be the most suitable for such a case.

public extension StreamedFileReader {
    func read() -> AsyncStream<DataChunk> {
        AsyncStream { continuation in
            self.read(nextChunk: { chunk in
                continuation.yield(chunk)
            }, completion: {
                continuation.finish()
            })
        }
    }
}

Using this AsyncStream I read some file iteratively and make network calls like this:

func process(_ url: URL) async {
    // ...
    do {
        for await chunk in reader.read() {
            let request = // ...
            _ = try await service.upload(data: chunk.data, request: request)
        }
    } catch let error {
        reader.cancelReading()
        print(error)
    }
}

The issue there is that there is no any limiting mechanism I'm aware of that won't allow to execute more than N network calls. Thus when I'm trying to upload huge file (5Gb) memory consumption grows drastically. Because of that the idea of streamed reading of file makes no sense as it'd be easier to read the entire file into the memory (it's a joke but looks like that).

In contrast, if I'm using a good old GCD everything works like a charm:

func process(_ url: URL) {
    let semaphore = DispatchSemaphore(value: 5) // limit to no more than 5 requests at a given time
    let uploadGroup = DispatchGroup()
    let uploadQueue = DispatchQueue.global(qos: .userInitiated)
    uploadQueue.async(group: uploadGroup) {
        // ...
        reader.read(nextChunk: { chunk in
            let requset = // ...
            uploadGroup.enter()
            semaphore.wait()
            service.upload(chunk: chunk, request: requset) {
                uploadGroup.leave()
                semaphore.signal()
            }
        }, completion: { _ in
            print("read completed")
        })
    }    
}

Well it is not exactly the same behavior as it uses a concurrent DispatchQueue when AsyncStream runs sequentially. So I did a little research and found out that probably TaskGroup is what I need in this case. It allows to run async tasks in parallel etc.

I tried it this way:

func process(_ url: URL) async {
    // ...
    do {
        let totalParts = try await withThrowingTaskGroup(of: Void.self) { [service] group -> Int in
            var counter = 1
            for await chunk in reader.read() {
                let request = // ...
                group.addTask {
                    _ = try await service.upload(data: chunk.data, request: request)
                }
                counter = chunk.index
            }
            return counter
        }
    } catch let error {
        reader.cancelReading()
        print(error)
    }
}

In that case memory consumption is even more that in example with AsyncStream iterating!

I suspect that there should be some conditions on which I need to suspend group or task or something and call group.addTask only when it is possible to really handle these tasks I'm going to add but I have no idea how to do it.

I found this Q/A And tried to put try await group.next() for each 5th chunk but it didn't help me at all.

Is there any mechanism similar to DispatchGroup + DispatchSemaphore but for modern concurrency?

UPDATE: In order to better demonstrate the difference between all 3 ways here are screenshots of memory report

AsyncStream iterating

AsyncStream iterating

AsyncStream + TaskGroup (using try await group.next() on each 5th chunk)

AsyncStream + TaskGroup

GCD DispatchQueue + DispatchGroup + DispatchSemaphore

GCD DispatchQueue + DispatchGroup + DispatchSemaphore

3

There are 3 answers

0
Rob On BEST ANSWER

The key problem is the use of the AsyncStream. Your AsyncStream is reading data and yielding chunks more quickly than it can be uploaded.

Consider this MCVE where I simulate a stream of 100 chunks, 1mb each:

import os.log

private let log = OSLog(subsystem: "Test", category: .pointsOfInterest)

struct Chunk {
    let index: Int
    let data: Data
}

actor FileMock {
    let maxChunks = 100
    let chunkSize = 1_000_000
    var index = 0

    func nextChunk() -> Chunk? {
        guard index < maxChunks else { print("done"); return nil }
        defer { index += 1 }
        return Chunk(index: index, data: Data(repeating: UInt8(index & 0xff), count: chunkSize))
    }

    func chunks() -> AsyncStream<Chunk> {
        AsyncStream { continuation in
            index = 0
            while let chunk = nextChunk() {
                os_signpost(.event, log: log, name: "chunk")
                continuation.yield(chunk)
            }

            continuation.finish()
        }
    }
}

And

func uploadAll() async throws {
    try await withThrowingTaskGroup(of: Void.self) { group in
        let chunks = await FileMock().chunks()
        var index = 0
        for await chunk in chunks {
            index += 1
            if index > 5 {
                try await group.next()
            }
            group.addTask { [self] in
                try await upload(chunk)
            }
        }
        try await group.waitForAll()
    }
}

func upload(_ chunk: Chunk) async throws {
    let id = OSSignpostID(log: log)
    os_signpost(.begin, log: log, name: #function, signpostID: id, "%d start", chunk.index)
    try await Task.sleep(nanoseconds: 1 * NSEC_PER_SEC)
    os_signpost(.end, log: log, name: #function, signpostID: id, "end")
}

When I do that, I see memory spike to 150mb as the AsyncStream rapidly yields all of the chunks upfront:

enter image description here

Note that all the signposts, showing when the Data objects are created, are clumped at the start of the process.


Note, the documentation warns us that the sequence might conceivably generate values faster than they can be consumed:

An arbitrary source of elements can produce elements faster than they are consumed by a caller iterating over them. Because of this, AsyncStream defines a buffering behavior, allowing the stream to buffer a specific number of oldest or newest elements. By default, the buffer limit is Int.max, which means the value is unbounded.

Unfortunately, the various buffering alternatives, .bufferingOldest and .bufferingNewest, will only discard values when the buffer is filled. In some AsyncStreams, that might be a viable solution (e.g., if you are tracking the user location, you might only care about the most recent location), but when uploading chunks of the file, you obviously cannot have it discard chunks when the buffer is exhausted.


So, rather than AsyncStream, just wrap your file reading with a custom AsyncSequence, which will not read the next chunk until it is actually needed, dramatically reducing peak memory usage, e.g.:

struct FileMock: AsyncSequence {
    typealias Element = Chunk

    struct AsyncIterator : AsyncIteratorProtocol {
        let chunkSize = 1_000_000
        let maxChunks = 100
        var current = 0

        mutating func next() async -> Chunk? {
            os_signpost(.event, log: log, name: "chunk")

            guard current < maxChunks else { return nil }
            defer { current += 1 }
            return Chunk(index: current, data: Data(repeating: UInt8(current & 0xff), count: chunkSize))
        }
    }

    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator()
    }
}

And

func uploadAll() async throws {
    try await withThrowingTaskGroup(of: Void.self) { group in
        var index = 0
        for await chunk in FileMock() {
            index += 1
            if index > 5 {
                try await group.next()
            }
            group.addTask { [self] in
                try await upload(chunk)
            }
        }
        try await group.waitForAll()
    }
}

And that avoids loading all 100mb in memory at once. Note, the vertical scale on memory is different, but you can see that the peak usage is 100mb less than the above graph and the signposts, showing when data is read into memory, are now distributed throughout the graph rather than all at the start:

enter image description here

Now, obviously, I am only mocking the reading of a large file with Chunk/Data objects and mocking the upload with a Task.sleep, but it hopefully illustrates the basic idea.

Bottom line, do not use AsyncStream to read the file, but rather consider a custom AsyncSequence or other pattern that reads the file in as the chunks are needed.


A few other observations:

  • You said “tried to put try await group.next() for each 5th chunk”. Perhaps you can show us what you tried. But note that this answer didn’t say “each 5th chunk” but rather “every chunk after the 5th”. We cannot comment on what you tried unless you show us what you actually tried (or provide a MCVE). And as the above shows, using Instruments’ “Points of Interest” tool can show the actual concurrency.

  • By the way, when uploading large asset, consider using a file-based upload rather than Data. The file-based uploads are far more memory efficient. Regardless of the size of the asset, the memory used during a file-based asset will be measured in kb. You can even turn off chunking entirely, and a file-based upload will use very little memory regardless of the file size. URLSession file uploads have a minimal memory footprint. It is one of the reasons we do file-based uploads.

  • The other reason for file-based uploads is that, for iOS especially, one can marry the file-based upload with a background session. With a background session, the user can even leave the app to do something else, and the upload will continue to operate in the background. At that point, you can reassess whether you even need/want to do chunking at all.

0
Confused Vorlon On

I wanted to be able to put async tasks into a queue something like an NSOperationQueue. I want to limit the maximum number of concurrent operations, and also to set priorities so that high priority tasks are pulled from the queue before low priority ones.

An Apple engineer at a WWDC lab pointed out that you can use withCheckedContinuation to suspend a task. This provides a continuation which you can then call to restart the task at a later date.

This is the key for my Runner.

You create a runner with

static let analysis = Runner(maxTasks: 2)

then add a task to it with

try await Runner.analysis.queue(priority: Runner.Priority.high) {
    [weak self] in
    //Do work here
    try await doSomethingExpensive()
}

The Runner is as follows...

import Foundation

protocol HasPriority {
    var priority:Double {get}
}



actor Runner  {
    //MARK: Initialisers
    
    /// Create runner with max tasks
    /// - Parameter maxTasks: count
    init(maxTasks: Int) {
        self.maxTasks = maxTasks
    }
    
    //MARK: Static/Class constants
    
    //MARK: Structures (enums / errors / notifications / etc)
    
    /// Concrete implementation of HasPriority
    enum Priority:HasPriority {
        case high //100
        case medium //50
        case low //0
        case custom(Double)
        //Note - date variants are not compatible other cases, oldestFirst is not compatible with newestFirst
        case oldestFirst(Date)
        case newestFirst(Date)
        
        var priority: Double {
            switch self {
            case .high:
                return 100
            case .medium:
                return 50
            case .low:
                return 0
            case .custom(let value):
                return value
            case .oldestFirst(let date):
                return -date.timeIntervalSince1970
            case .newestFirst(let date):
                return date.timeIntervalSince1970
            }
        }
    }
    
    /// Tickets hold priority and continuation information.
    /// These are only modified or read by the actor after initial creation, so we don't have to worry about concurrency
    /// They're the operation holder
    private class Ticket:Identifiable {
        internal init(priority: Double,runner:Runner) {
            self.priority = priority
            self.runner = runner
        }

        let id = UUID()
        let priority:Double
        private var runner:Runner
        var continuation:CheckedContinuation<Void, Never>?
        var running:Bool = false
        
        func didFinish() async {
            await runner.didFinish(self)
        }
        
        func run() {
            running = true
            continuation!.resume()
        }
    }
    
    //MARK: Published vars
    
    //MARK: Vars

    private let maxTasks:Int
    
    //MARK: Coding Keys
    
    //MARK: Class Methods
    
    //MARK: Instance Methods
    
    
    /// Current running count
    private var runningCount:Int {
        return tickets.filter({ $0.running }).count
    }
    
    /// called to progress the ticket queue
    private func progress() {
        while(runningCount < maxTasks) {
            let notRunning = tickets.filter { !$0.running }
            let topPriority = notRunning.max { t1, t2 in
                t1.priority < t2.priority
            }
            
            guard let topPriority else {
                print("Queue emptied")
                return
            }
            
            //there may be multiple elements with max priority. If so, we want to run the first
            guard let next = notRunning.first(where: { $0.priority == topPriority.priority  }) else {
                fatalError("this should not be possible")
            }
            
            next.run()
        }
    }
    
    /// Must be called when a ticket finishes to remove it from the queue
    /// - Parameter ticket: ticket
    private func didFinish(_ ticket:Ticket) {
        //print("did finish ticket with priority: \(ticket.priority)")
        tickets.removeAll { $0.id == ticket.id }
        progress()
    }

    private var tickets:[Ticket] = []
    
    nonisolated
    /// Used in the continuation to add a ticket
    /// - Parameter ticket: ticket
    private func add(_ ticket:Ticket) {
        Task {
            await append(ticket)
        }
    }
    
    /// Actor isolated function to add ticket
    /// - Parameter ticket: ticket
    private func append(_ ticket:Ticket) {
        precondition(ticket.continuation != nil)
        tickets.append(ticket)
        progress()
    }
    
    nonisolated
    /// Queue an async task. The task is suspended (so no thread is required), then run according to priority in the queue
    /// If priorities are equal, then tasks are run in order of submission
    /// - Parameters:
    ///   - priority: Use Runner.Priority or create your own enum which conforms to HasPriority
    ///   - work: the async work to do
    /// - Returns: the task return value
    func queue<Success>(priority:HasPriority, work:@escaping  (() async throws -> Success) ) async throws -> Success {
        let ticket:Ticket = Ticket(priority: priority.priority, runner: self)
        
        defer {
            Task {
                await didFinish(ticket)
            }
        }
        
        await withCheckedContinuation({ continuation in
            ticket.continuation = continuation
            self.add(ticket)
            return ()
        })
        
        //If task has been cancelled while in the queue - we'll find out when we pull it off the queue and run it
        try Task.checkCancellation()
        
        return try await work()
        
    }
}
0
jules On

Another possibility might be to use a Combine PassthroughSubject to actually publish the chunks. Then use the publisher's values property to get a reference to an AsyncSequence. You'd still need the withThrowingTaskGroup, as suggested by Rob in his solution, but you would no longer need to build your own custom AsyncSequence