WITH-OUTPUT-TO-STRING with multithreading in Common Lisp

239 views Asked by At

I want to do something that means the following:

(with-output-to-string (*standard-output*)
  (bt:join-thread
   (bt:make-thread
    (lambda ()
      (format *standard-output* "Hello World")))))
;=> "" (actual output)
;=> "Hello World" (expected output)

In my understanding, this does not work because the *standard-output* that gets dynamically rebound by with-output-to-string outside the thread does not take effect inside the thread. What are the possible and recommedable ways?

In essence, I want to capture the output that was written to *standard-output* by another thread.

3

There are 3 answers

3
Rainer Joswig On BEST ANSWER

One can rebind the special variable to be thread local:

(with-output-to-string (*standard-output*)
  (bt:join-thread
    (bt:make-thread
      (lambda ()
        (format *standard-output* "Hello World"))
      :initial-bindings `((*standard-output* . ,*standard-output*)))))

*initial-bindings* is an alist with (variable . value) elements.

2
digikar On

A previous idea was to mutate the original binding of *standard-output* itself. As @coredump suggested, this has the downside that the binding will be mutated in all the threads - other threads that are sending their output to *standard-output* would also send their output to the string-output-stream.

Another idea is to let the thread itself decide whether to send the output to *standard-output* or to some other stream:

(let ((in-with-output-to-string nil)
      (output-stream-string     nil))
  (unwind-protect
       (progn
         (setq output-stream-string (make-string-output-stream))
         (setq in-with-output-to-string t)
         (bt:join-thread
          (bt:make-thread
           (lambda ()
             (format (if in-with-output-to-string
                         output-stream-string
                         *standard-output*)
                     "Hello World"))))
         (get-output-stream-string output-stream-string))
    (setq in-with-output-to-string nil)))
;=> "Hello World"

A more involved example is illustrated in the following. The general situation I was interested in involved a thread reading some stream and sending the contents of that stream to *standard-output*. However, in certain cases, I was interested in capturing the output of that thread into a string.

Even before that, drawing inspiration from [1], we define a helper macro which captures the variable bindings that were present before executing the body and then restores them once the body has completed execution.

(deftype list-of (&rest types)
  (if types
      `(cons ,(first types) (list-of ,@(rest types)))
      'null))

(defmacro thread-global-let (bindings &body body)
  (let* ((bindings (mapcar (lambda (binding)
                             ;; Normalize the bindings
                             (etypecase binding
                               (symbol
                                (list binding nil))
                               ((list-of symbol)
                                (list (first binding) nil))
                               ((list-of symbol t)
                                binding)))
                           bindings))
         (variables (mapcar #'first bindings))
         (gensyms (alexandria:make-gensym-list (length variables))))
    `(let (,@(mapcar (lambda (var gensym)
                       `(,gensym ,var))
                     variables gensyms))
       (unwind-protect
            (progn
              ,@(mapcar (lambda (binding)
                          `(setq ,@binding))
                        bindings)
              ,@body)
         ,@(mapcar (lambda (var gensym)
                     `(setq ,var ,gensym))
                   variables gensyms)))))

The main example then is the following:

(defvar *input-wait-condition* (bt:make-condition-variable))
(defvar *input-wait-lock*      (bt:make-lock))
(defvar *stream-input-string*  nil)
(defvar *thread*)

(let ((in-with-thread-output nil)
      (stream-output-string  nil))

  (when (and (boundp '*thread*)
             (bt:threadp *thread*))
    (bt:destroy-thread *thread*))
  (setq *thread*
        (bt:make-thread
         (lambda ()
           (bt:with-lock-held (*input-wait-lock*)
             (loop :do (bt:condition-wait *input-wait-condition* *input-wait-lock*)
                       (loop :while (listen *stream-input-string*)
                             :do (write-char (read-char *stream-input-string*)
                                             (if in-with-thread-output
                                                 stream-output-string
                                                 *standard-output*))))))))

  (defun thread-output-thunk (thunk)
    (thread-global-let ((stream-output-string (make-string-output-stream))
                        (in-with-thread-output t))
      (funcall thunk)
      (get-output-stream-string stream-output-string))))

(defmacro with-thread-output (&body body)
  `(thread-output-thunk (lambda () ,@body)))

What it essentially achieves is the following:

CL-USER> (setq *stream-input-string* (make-string-input-stream "Hello World"))
#<SB-IMPL::STRING-INPUT-STREAM {100D0D47A3}>
CL-USER> (bt:condition-notify *input-wait-condition*)
NIL
Hello World
CL-USER> (with-thread-output
           (thread-global-let
               ((*stream-input-string*
                 (make-string-input-stream "Output from a thread")))
             (bt:condition-notify *input-wait-condition*)
             (loop :while (listen *stream-input-string*))))
"Output from a thread"
CL-USER> (with-thread-output
           (thread-global-let
               ((*stream-input-string*
                 (make-string-input-stream "Output from a thread")))
             (print (with-thread-output
                      (thread-global-let
                          ((*stream-input-string*
                            (make-string-input-stream "Output from a thread 2")))
                        (bt:with-lock-held (*input-wait-lock*)
                          (bt:condition-notify *input-wait-condition*))
                        (loop :while (listen *stream-input-string*)))))
             (bt:with-lock-held (*input-wait-lock*)
               (bt:condition-notify *input-wait-condition*))
             (loop :while (listen *stream-input-string*))))

"Output from a thread 2"
"Output from a thread"


The following code illustrates the previous idea of mutating the original binding of *standard-output*. This has the downside of the mutation affecting all the threads.

(let ((original-stdout *standard-output*))
  (with-output-to-string (stdout)
    (unwind-protect
         (progn
           (setq *standard-output* stdout)
           (bt:join-thread
            (bt:make-thread
             (lambda ()
               (format *standard-output* "Hello World")))))
      (setq *standard-output* original-stdout))))
3
coredump On

This all seem a bit complex, and it looks like you are calling bt:join-thread to wait for the thread to finish. Unfortunately, this means that your main thread is blocked until the worker thread is finished, something that typically is a code smell in multi-threaded applications (if you block the main thread, you might as well call the function directly in the same thread).

In fact, when threads are involved, lexically scoped macros like with-X are usually either better done inside the thread, or not at all. I am going to use the lparallel library because it provides queues datastructures.

(defpackage :so (:use :cl))
(in-package :so)
(ql:quickload :lparallel)

Let's define a sample test function that uses standard input and output streams:

(defun test-function ()
  "A test function that acts on standard input/output streams"
  (format t "input is: ~s" (read-line)))

The following make-thread/redirect-io function takes a function to execute as well as its input as a string. It returns another function that will block until the thread terminates and return the accumulated output as a string. In practice you would also need to handle exceptions:

(defun make-thread/redirect-io (fn input-as-string)
  (let ((queue (lparallel.queue:make-queue)))
    (values (lambda () (lparallel.queue:pop-queue queue))
            (bt:make-thread 
             (lambda ()
               (lparallel.queue:push-queue 
                (with-output-to-string (*standard-output*)
                  (with-input-from-string (*standard-input* input-as-string)
                    (funcall fn)))
                queue))))))

This allows you to really run things in parallel, for example you can spawn two threads with different inputs:

(let ((join-a (make-thread/redirect-io #'test-function "Hello"))
      (join-b (make-thread/redirect-io #'test-function "World")))
  
  ;; do something else in parallel, do not join the thread otherwise
  ;; it would just be blocking as-if you called the function in the
  ;; same thread
  
  ;; then, get the results
  (list (funcall join-a)
        (funcall join-b)))

This returns the following list:

("input is: \"Hello\"" "input is: \"World\"")

In fact, I'd suggest using having a look at lparallel.org to see if you can use it directly, it can greatly simplify working with threads.