Context
I have 2 AsyncThrowingStream(s)
which push (respectively) the first 5 even (and odd) non negative integers.
var evens = stream(start: 0) // 0, 2, 4, 6, 8
var odds = stream(start: 1) // 1, 3, 5, 7, 9
Every value is pushed after 1...3
seconds.
Concurrency
Since the 2 streams need some time to push the next value, I want to run them in concurrency.
E.g.
Task {
let evens = stream(start: 0)
for try await even in evens {
}
}
Task {
let odds = stream(start: 1)
for try await odd in odds {
}
}
Processing in order
Next, I have this function
func process(value: Int) {
print(value)
}
Question
How can I invoke process(value:)
for all the generated Int(s)
in order?
The result invocations should look like this
process(value: 0)
process(value: 1)
process(value: 2)
process(value: 3)
process(value: 4)
process(value: 5)
process(value: 6)
process(value: 7)
process(value: 8)
process(value: 9)
More
For reference and testing I am adding a mock implementation to generate the 2 streams
func stream(start: Int) -> AsyncThrowingStream<Int, Error> {
.init { continuation in
Task {
for index in stride(from: start, to: 10, by: 2) {
continuation.yield(index)
let random = UInt64.random(in: (1...3))
try await Task.sleep(nanoseconds: random * 1_000_000_000)
}
continuation.finish()
}
}
}
Thanks.
Update 1: Example for sync context
To be clear, if this was a sync context, my solution would look like this
let evens = [0, 2, 4, 6, 8]
let odds = [1, 3, 5, 7, 9]
zip(evens, odds)
.map { [$0, $1] }
.flatMap { $0 }
.forEach(process)
Output:
0
1
2
3
4
5
6
7
8
9
Update 2: Priority Queue
Another requirement that may not be obvious from the question is that process(value:)
must be called as soon as the next integer is produced by the streams. the successor of the last processed
integer is produced by any of the 2 streams.
In other words:
process(value:)
must be called with arguments following the natural order of the non negative integers(0, 1, 2, 3, 4)
.- AND
process(value:)
must be called as soon as possible (as the next integer has been produced by any of the streams).
Example
evens pushes 0
then process(value:0) is called
odds pushed 1
then process(value:1) is called
odds pushed 3
then 3 is put on hold
evens pushes 2
then process(value:2) is called
and process(value:3) is called
A Priority Queue sounds like a good fit for this case, but I am looking for the correct implementation compatible with AsyncThrowingStream and Swift Concurrency.
In a broader observation, you can
zip
ormerge
sequences using Apple’s Swift Async Algorithms package. For example, you said:If you use Apple’s Swift Async Algorithms package, you can (a) make your async sequences by just adding
async
; and (b)zip
the two sequences into one. E.g.:This will, for example, not process the third odd element until both the third even and odd elements were emitted.
So, imagine that these two sequences emitted the values in this order:
[0, 1, 3, 2, 5, 7, 4, 9, 6, 8]
. Thezip
approach will process them in order,[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
.On the other hand, if you wanted to process them as they come in, you might use
merge
Considering the above example, this
merge
rendition will process them in the order that they were encountered, e.g.,[0, 1, 3, 2, 5, 7, 4, 9, 6, 8]
.To illustrate the difference between
zip
andmerge
, let me share some RxMarbles-like diagrams, which visually demonstrate these two sequence combining functions. In each diagram, the top two arrows are the timeline for the respective two input sequences, and the bottom arrow is the timeline for the resulting output sequence.E.g.,
zip
:As opposed to
merge
:Hopefully, that helps illustrate what is going on.