I'm trying to build a chunked file uploading mechanism using modern Swift Concurrency.
There is a streamed file reader which I'm using to read files chunk by chunk of 1mb size.
It has two closures nextChunk: (DataChunk) -> Void
and completion: () - Void
. The first one gets called as many times as there is data read from InputStream
of a chunk size.
In order to make this reader compliant to Swift Concurrency I made the extension and created AsyncStream
which seems to be the most suitable for such a case.
public extension StreamedFileReader {
func read() -> AsyncStream<DataChunk> {
AsyncStream { continuation in
self.read(nextChunk: { chunk in
continuation.yield(chunk)
}, completion: {
continuation.finish()
})
}
}
}
Using this AsyncStream
I read some file iteratively and make network calls like this:
func process(_ url: URL) async {
// ...
do {
for await chunk in reader.read() {
let request = // ...
_ = try await service.upload(data: chunk.data, request: request)
}
} catch let error {
reader.cancelReading()
print(error)
}
}
The issue there is that there is no any limiting mechanism I'm aware of that won't allow to execute more than N network calls. Thus when I'm trying to upload huge file (5Gb) memory consumption grows drastically. Because of that the idea of streamed reading of file makes no sense as it'd be easier to read the entire file into the memory (it's a joke but looks like that).
In contrast, if I'm using a good old GCD everything works like a charm:
func process(_ url: URL) {
let semaphore = DispatchSemaphore(value: 5) // limit to no more than 5 requests at a given time
let uploadGroup = DispatchGroup()
let uploadQueue = DispatchQueue.global(qos: .userInitiated)
uploadQueue.async(group: uploadGroup) {
// ...
reader.read(nextChunk: { chunk in
let requset = // ...
uploadGroup.enter()
semaphore.wait()
service.upload(chunk: chunk, request: requset) {
uploadGroup.leave()
semaphore.signal()
}
}, completion: { _ in
print("read completed")
})
}
}
Well it is not exactly the same behavior as it uses a concurrent DispatchQueue
when AsyncStream
runs sequentially.
So I did a little research and found out that probably TaskGroup
is what I need in this case. It allows to run async tasks in parallel etc.
I tried it this way:
func process(_ url: URL) async {
// ...
do {
let totalParts = try await withThrowingTaskGroup(of: Void.self) { [service] group -> Int in
var counter = 1
for await chunk in reader.read() {
let request = // ...
group.addTask {
_ = try await service.upload(data: chunk.data, request: request)
}
counter = chunk.index
}
return counter
}
} catch let error {
reader.cancelReading()
print(error)
}
}
In that case memory consumption is even more that in example with AsyncStream
iterating!
I suspect that there should be some conditions on which I need to suspend group or task or something and
call group.addTask
only when it is possible to really handle these tasks I'm going to add but I have no idea how to do it.
I found this Q/A
And tried to put try await group.next()
for each 5th chunk but it didn't help me at all.
Is there any mechanism similar to DispatchGroup
+ DispatchSemaphore
but for modern concurrency?
UPDATE: In order to better demonstrate the difference between all 3 ways here are screenshots of memory report
The key problem is the use of the
AsyncStream
. YourAsyncStream
is reading data and yielding chunks more quickly than it can be uploaded.Consider this MCVE where I simulate a stream of 100 chunks, 1mb each:
And
When I do that, I see memory spike to 150mb as the
AsyncStream
rapidly yields all of the chunks upfront:Note that all the
Ⓢ
signposts, showing when theData
objects are created, are clumped at the start of the process.Note, the documentation warns us that the sequence might conceivably generate values faster than they can be consumed:
Unfortunately, the various buffering alternatives,
.bufferingOldest
and.bufferingNewest
, will only discard values when the buffer is filled. In someAsyncStreams
, that might be a viable solution (e.g., if you are tracking the user location, you might only care about the most recent location), but when uploading chunks of the file, you obviously cannot have it discard chunks when the buffer is exhausted.So, rather than
AsyncStream
, just wrap your file reading with a customAsyncSequence
, which will not read the next chunk until it is actually needed, dramatically reducing peak memory usage, e.g.:And
And that avoids loading all 100mb in memory at once. Note, the vertical scale on memory is different, but you can see that the peak usage is 100mb less than the above graph and the
Ⓢ
signposts, showing when data is read into memory, are now distributed throughout the graph rather than all at the start:Now, obviously, I am only mocking the reading of a large file with
Chunk
/Data
objects and mocking the upload with aTask.sleep
, but it hopefully illustrates the basic idea.Bottom line, do not use
AsyncStream
to read the file, but rather consider a customAsyncSequence
or other pattern that reads the file in as the chunks are needed.A few other observations:
You said “tried to put try await group.next() for each 5th chunk”. Perhaps you can show us what you tried. But note that this answer didn’t say “each 5th chunk” but rather “every chunk after the 5th”. We cannot comment on what you tried unless you show us what you actually tried (or provide a MCVE). And as the above shows, using Instruments’ “Points of Interest” tool can show the actual concurrency.
By the way, when uploading large asset, consider using a file-based upload rather than
Data
. The file-based uploads are far more memory efficient. Regardless of the size of the asset, the memory used during a file-based asset will be measured in kb. You can even turn off chunking entirely, and a file-based upload will use very little memory regardless of the file size.URLSession
file uploads have a minimal memory footprint. It is one of the reasons we do file-based uploads.The other reason for file-based uploads is that, for iOS especially, one can marry the file-based upload with a background session. With a background session, the user can even leave the app to do something else, and the upload will continue to operate in the background. At that point, you can reassess whether you even need/want to do chunking at all.