Cursive: Clojure's *out*, different Writers, flushing and ordering inconsistency when multithreaded: what is going on?

315 views Asked by At

tl;dr Why Clojure creates a separate Writer for threads in newFixedThreadPool? Why it may be flushed after the pool is terminated? Why the behaviour can only be reproduced in Cursive?

Suppose we have an application that does something in separate threads, and that something writes to stdout. Suppose that after we've done everything, we want to print a final message.

First thing we'll run into is that Clojure's println, if multiple arguments supplied, will produce interleaved output. This is covered here.

But there seems to be another problem. If we run something like this:

(defn main []
  (let [pool (make-pool num-threads)]
    (print-multithreaded pool "Hello, world!")
    (shutdown-pool pool))
  (safe-println "All done, have a nice day."))

We'll sometimes have

Hello, world!
All done, have a nice day.

and sometimes

All done, have a nice day.
Hello, world!

Maybe flush after each write?

(defn safe-println [& more]
  (.write *out* (str (clojure.string/join " " more) "\n"))
  (.flush *out*))

Doesn't work. What works is resorting to explicit Java interop on top of System.out, like this:

(defn safe-println [& more]
  (let [writer (System/out)]
    (.println writer (str (clojure.string/join " " more)))
    (.flush writer)))

Making writer a (PrintWriter. System/out) or (OutputStreamWriter. System/out) also works.

Seems like we have different *out*s in our threads... Indeed,

(def out *out*)
(defn safe-println [& more]
  (.write out (str (clojure.string/join " " more) "\n"))
  (.flush out))

works.

So here's the question: why is this happening? With Java pieces, it makes sense: System.out is static final, so only one instance exists for all the threads, and everything talks to it, so everything adds to the same buffer. With printing to Clojure's *out*, main thread and pooled threads have their own *out*, with their own buffers (and for main thread it's a PrintWriter, for pooled ones it's a shared OutputStreamWriter). I don't really get why it's like that in the first place, and I don't really get why it results to incosistent ordering: we explicitly finish all our threads before calling the final print, which should induce implicit flush. But even if we add an explicit flush, the result stays the same.

I might be missing some really obvious detail here, and I'd be glad if you helped me out. If you'd like to see the whole reproducible example, which I don't include here because of its length, here's a link to the gist: https://gist.github.com/trueneu/b8498aa259899a8fc979090fccf632de

EDIT: First version of gist actually works and you have to tinker with it to break it, so I edited it to demonstrate "incorrect" behaviour from the start.

Also, to remove any misunderstandings, here's a screenshot from Cursive: https://ibb.co/jHqSL0

EDIT2: This was pointed out in original question, but I'll put some emphasis. Understanding the point and mechanism of this behaviour is half of the question. New *out* is not created for each thread. But it seems to be creating a separate one for the thread pool. (For this output, reduce num-threads to 1, and add printing of (.toString *out*) to safe-println. Increasing num-threads doesn't produce new object addresses):

(main)
java.io.PrintWriter@1dcc77c6
All done, have a nice day.
=> nil
java.io.OutputStreamWriter@7104a76f
Hello, world!

EDIT3: Changed map with doseq after @glts comment. Also, when run from lein repl, it always produces correct output, which confuses me further. So as David Arenas's pointed out, seems like behaviour depends on upstream output handling. However, questions still stand.

EDIT4: David Arenas also checked that in Cider, and cannot reproduce the behaviour. Seems that it has something to do with Cursive's nrepl output handling implementation.

1

There are 1 answers

3
David Arenas On BEST ANSWER

Clojure's *out* does not create an instance for each thread (it is also static final), but it does use OutputStreamWriter which has no atomic guarantees. You would need to synchronize threads on a buffer since you are writing to a single stream.

If you run your code using nrepl you'll see that you get the "correct" behavior. This is because they re-bind out to their own writer which uses a locking buffer.

nrepl's session-out:

(defn- session-out
  "Returns a PrintWriter suitable for binding as *out* or *err*.  All of
   the content written to that PrintWriter will (when .flush-ed) be sent on the
   given transport in messages specifying the given session-id.
   `channel-type` should be :out or :err, as appropriate."
  [channel-type session-id transport]
  (let [buf (clojure.tools.nrepl.StdOutBuffer.)]
    (PrintWriter. (proxy [Writer] []
                    (close [] (.flush ^Writer this))
                    (write [& [x ^Integer off ^Integer len]]
                      (locking buf
                        (cond
                          (number? x) (.append buf (char x))
                          (not off) (.append buf x)
                          ; the CharSequence overload of append takes an *end* idx, not length!
                          (instance? CharSequence x) (.append buf ^CharSequence x (int off) (int (+ len off)))
                          :else (.append buf ^chars x off len))
                        (when (<= *out-limit* (.length buf))
                          (.flush ^Writer this))))
                    (flush []
                      (let [text (locking buf (let [text (str buf)]
                                                (.setLength buf 0)
                                                text))]
                        (when (pos? (count text))
                          (t/send (or (:transport *msg*) transport)
                                  (response-for *msg* :session session-id
                                                channel-type text))))))
                  true)))