Consider the following piece of code taken from the example walkthrough of core.async:
(let [c1 (chan)
c2 (chan)]
(thread
(while true
(let [[v ch] (alts!! [c1 c2])]
(println "Read" v "from" ch))))
(>!! c1 "hi")
(>!! c2 "there"))
My assumption is that the thread has a reference to both channels c1
and c2
and will basically run forever trying to take values from either which will never come. So, neither will the channels be garbage collected nor will the thread terminate. Even if we explicitly close!
the channels, the thread would still continue. Is my conclusion correct or am I missing something?
I'm asking because I'm trying to find a way on how I can successfully test such core.async code with such an endlessly running consumer. My current attempt looks like this:
(let [c1 (chan)
c2 (chan)]
(go
(>!! c1 "hi")
(>!! c2 "there"))
(async/thread
(loop [[v ch] (alts!! [c1 c2])]
(println "Read" v "from" ch)
(when-let [[nv nch] (alts!! [c1 c2])]
(if nv
(recur [nv nch])
:done)))))
This returns a result channel (from thread
) of which I would like to blocking take the :done
value, but I would need a way of closing (at least one of) the channels. I could return a list of the two channels c1, c2
and the result channel returned by thread
and then close!
e.g. c1
afterwards and check the result channel, but that's extremely ugly:
(let [c1 (chan)
c2 (chan)]
(go
(>!! c1 "hi")
(>!! c2 "there"))
[c1 c2 (async/thread
(loop [[v ch] (alts!! [c1 c2])]
(println "Read" v "from" ch)
(when-let [[nv nch] (alts!! [c1 c2])]
(if nv
(recur [nv nch])
:done))))])
=> [#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@136535df>]
Read hi from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def>
Read there from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e>
(let [[c1 c2 resultchan] *1]
(close! c1)
(<!! resultchan))
=>:done
Alternatively, I could probably send a special "End-of-Communication" value, which I could then check on the receiving side.
What does the best practice look like for this?
The idea with sending a special
end-of-communication
doesn't work when you're not sending simple values, because you can't guarantee that values will be put and taken in the order you intend them to have.The next idea would be that sender and receiver both know in advance about the number of values to process, e.g. like this:
This works in principle, but has the obvious drawback that you have to agree on the amount before setting up the sending and receiving processes and the less obvious drawback that in case that something goes wrong on the sending side, you'll end up waiting endlessly on the receiving side again (assuming that the sending side does more than just putting values onto the channel).
I've come to the conclusion that the only way to make this reliable is to use a
timeout
channel, like this: