I've noticed that sometimes my combine pipeline is "stuck" when I use "DispatchQueue.global()" as a scheduler of "delay".
for i in 0..<100 {
let scheduler = DispatchQueue.global()
let timeoutPublisher = Just<Int>(0)
.delay(for: .seconds(0.1), scheduler: scheduler)
let anotherJust = Just(1)
var ok = false
self.cancellable = anotherJust
.merge(with: timeoutPublisher)
.sink { [weak self] state in
if state == 0 {
ok = true
}
}
try? await Task.wait(.seconds(1))
// ...
}
In this code snippet, sometimes the "ok" is never set to true. It only happens occasionally so I suspect it's some threading issue. However, when it gets into that invalid state, the value "0" is never published.
When I switch scheduler to "DispatchQueue.main", it always works.
Anyone could explain what am I missing here?
The problem has to do with the timing and when the pipelines publish their "finish" events.
I took your code and wrapped it up in a
XCTestCaseAnd then I started a timer at intervals of 1 second and ran the test watching the output until the test failed:
Here's the transcript from a successful test:
You can see that the "just 1" pipeline received the value 0 before it receives the "just 1's"
finishedevent and the test succeeded.Here's a example of a failed test:
Notice that both
finishedevents arrived before the just 1 pipeline received a value of 0.Since the global dispatch queues are not serial queues. I suspect that the
delayoperator is creating separate, parallel blocks for the "publish a 0" event and the "publish a finish" event. Your test succeeds or fails depending on which of those gets delivered first. When you use the main queue, that is a serial queue and the problem goes away. Try using a serial queue that you create yourself as the scheduler for yourdelayand see if that resolves your problem.