When will channel be discarded when a thread keeps taking from it?

242 views Asked by At

Consider the following piece of code taken from the example walkthrough of core.async:

(let [c1 (chan)
      c2 (chan)]
   (thread 
      (while true
         (let [[v ch] (alts!! [c1 c2])]
              (println "Read" v "from" ch))))
   (>!! c1 "hi")
   (>!! c2 "there"))

My assumption is that the thread has a reference to both channels c1 and c2 and will basically run forever trying to take values from either which will never come. So, neither will the channels be garbage collected nor will the thread terminate. Even if we explicitly close! the channels, the thread would still continue. Is my conclusion correct or am I missing something?

I'm asking because I'm trying to find a way on how I can successfully test such core.async code with such an endlessly running consumer. My current attempt looks like this:

(let [c1 (chan)
      c2 (chan)]
    (go 
        (>!! c1 "hi")
        (>!! c2 "there"))
    (async/thread
      (loop [[v ch] (alts!! [c1 c2])]
        (println "Read" v "from" ch)
        (when-let [[nv nch] (alts!! [c1 c2])]
          (if nv
             (recur [nv nch])
             :done)))))

This returns a result channel (from thread) of which I would like to blocking take the :done value, but I would need a way of closing (at least one of) the channels. I could return a list of the two channels c1, c2 and the result channel returned by thread and then close! e.g. c1 afterwards and check the result channel, but that's extremely ugly:

(let [c1 (chan)
      c2 (chan)]
    (go 
        (>!! c1 "hi")
        (>!! c2 "there"))
    [c1 c2 (async/thread
      (loop [[v ch] (alts!! [c1 c2])]
        (println "Read" v "from" ch)
        (when-let [[nv nch] (alts!! [c1 c2])]
          (if nv
              (recur [nv nch])
              :done))))])
=> [#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@136535df>]
   Read hi from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def>
   Read there from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e>

(let [[c1 c2 resultchan] *1]
  (close! c1)
  (<!! resultchan))
=>:done

Alternatively, I could probably send a special "End-of-Communication" value, which I could then check on the receiving side.

What does the best practice look like for this?

2

There are 2 answers

0
schaueho On BEST ANSWER

The idea with sending a special end-of-communication doesn't work when you're not sending simple values, because you can't guarantee that values will be put and taken in the order you intend them to have.

The next idea would be that sender and receiver both know in advance about the number of values to process, e.g. like this:

user> (<!!
        (let [c1 (chan)
              values ["hi" "there"]
              vcount (count values)]
           (doseq [value values]
             (thread
                  (>!! c1 value)))
           (thread
               (loop [recvalue (<!! c1)
                      reccount 1]
                  (println "Read" recvalue)
                  (if (= reccount vcount)
                      (do (close! c1)
                          :done)
                      (recur (<!! c1) (inc reccount)))))))
Read hi
Read there
:done

This works in principle, but has the obvious drawback that you have to agree on the amount before setting up the sending and receiving processes and the less obvious drawback that in case that something goes wrong on the sending side, you'll end up waiting endlessly on the receiving side again (assuming that the sending side does more than just putting values onto the channel).

I've come to the conclusion that the only way to make this reliable is to use a timeout channel, like this:

(<!! (let [c1 (chan)
           tchan (timeout 1000) 
           values ["hi" "there"]]
       (doseq [value values]
         (thread
           (>!! c1 value)))
       (thread
          (loop [[recvalue rchan] (alts!! [c1 tchan])
                 timeoutchan tchan]
            (if (= rchan timeoutchan)
                (do (close! c1)
                    :done)
                (do (println "Read" recvalue)
                    (let [newtimeout (timeout 1000)]
                        (recur (alts!! [c1 newtimeout])
                               newtimeout))))))

Read hi
Read there
:done
0
Viktor K. On

I don't know whether there is such a thing like best practice for this particular case. Here is my solution for the problem. I think it's quite simple.

(defn alts-while-open [f & chans]
   (let [a-chans (atom (set chans))]
     (go (while (< 0 (count @a-chans))
           (println "iteration started : " (vec @a-chans))
           (let [[v ch] (alts! (vec @a-chans))]
             (if v
               (f v ch)
               (swap! a-chans #(disj % ch))))))))

The function f is executed on the result of alts! while the channels are open. I keep here an atom with set of open channels. Once I find a channel that is closed I remove it from this set. If there are no more opened channels, the while loop stops. You can run it :

(def c1 (chan))
(def c2 (chan))
(def c3 (chan))
(alts-while-open (fn [v ch] (println v)) c1 c2 c3)

Now when something is written to any of these channels it gets printed out. You can see that new iteration starts after that. Once you close a channel you can see that an iteration started but the vector of channels is reduced. Once all channels are closed the while loop stops.

It's hard to answer question whether to use close! function or some other notification mechanism to stop the loop. I think it depends on situation. If there isn't any complex handling of "end of communication" I would just close! the channel. If there is more complex logic -e.g. there is successful "end of communication" and failed "end of communication" options and I want to handle them differently, then I would rather send a special message. Something like

[:end-of-communication :success]