Stop threads from interleaving output

629 views Asked by At

The following program creates two threads running concurrently, that each sleep for a random amount of time, before printing a line of text to stdout.

import Control.Concurrent
import Control.Monad
import System.Random

randomDelay t = randomRIO (0, t) >>= threadDelay

printer str = forkIO . forever $ do
  randomDelay 1000000 -- μs
  putStrLn str

main = do
  printer "Hello"
  printer "World"
  return ()

The output generally looks something like

>> main
Hello
World
World
Hello
WoHrelld
o
World
Hello
*Interrupted
>>

How do you ensure that only one thread can write to stdout at a time? This seems like the kind of thing that STM should be good at, but all STM transactions must have the type STM a for some a, and an action that prints to the screen has type IO a, and there doesn't seem to be a way to embed IO into STM.

5

There are 5 answers

0
shang On BEST ANSWER

The way to handle output with STM is to have an output queue that is shared between all threads and which is processed by a single thread.

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import System.Random

randomDelay t = randomRIO (0, t) >>= threadDelay

printer queue str = forkIO . forever $ do
  randomDelay 1000000 -- μs
  atomically $ writeTChan queue str

prepareOutputQueue = do
    queue <- newTChanIO
    forkIO . forever $ atomically (readTChan queue) >>= putStrLn
    return queue

main = do
  queue <- prepareOutputQueue
  printer queue "Hello"
  printer queue "World"
  return ()
0
Petr On

Locking in the way you're describing isn't possible usingSTM. This is because STM is based on optimistic locking and so every transaction must be restartable at any point. If you embedded an IO operation into STM, it could be executed multiple times.

Probably the easiest solution for this problem is to use a MVar as a lock:

import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import System.Random

randomDelay t = randomRIO (0, t) >>= threadDelay

printer lock str = forkIO . forever $ do
  randomDelay 1000000
  withMVar lock (\_ -> putStrLn str)

main = do
  lock <- newMVar ()
  printer lock "Hello"
  printer lock "World"
  return ()

In this solution the lock is passed as an argument to printer.

Some people prefer to declare the lock as a top-level global variable, but currently this requires unsafePerformIO and relies on properties of GHC that AFAIK aren't part of the Haskell Language Report (in particular, it relies on the fact that a global variable with non-polymorphic type is evaluated at most once during the execution of a program).

0
Chris Taylor On

A bit of research, based on Petr Pudlák's answer shows that there is a module Control.Concurrent.Lock in the concurrent-extra package that provides an abstraction around MVar ()-based locks.

The solution using that library is

import           Control.Concurrent
import qualified Control.Concurrent.Lock as Lock
import           Control.Monad
import           System.Random

randomDelay t = randomRIO (0, t) >>= threadDelay

printer lock str = forkIO . forever $ do
  randomDelay 1000
  Lock.with lock (putStrLn str)

main = do
  lock <- Lock.new
  printer lock "Hello"
  printer lock "World"
  return ()
0
dfeuer On

You can actually implement a lock using STM if you want, though an MVar will almost certainly perform better.

newtype Lock = Lock (TVar Status)
data Status = Locked | Unlocked

newLocked :: IO Lock
newLocked = Lock <$> newTVarIO Locked

newUnlocked :: IO Lock
newUnlocked = Lock <$> newTVarIO Unlocked

-- | Acquire a lock.
acquire :: Lock -> IO ()
acquire (Lock tv) = atomically $
  readTVar tv >>= \case
    Locked -> retry
    Unlocked -> writeTVar tv Locked

-- | Try to acquire a lock. If the operation succeeds,
-- return `True`.
tryAcquire :: Lock -> IO Bool
tryAcquire (Lock tv) = atomically $
  readTVar tv >>= \case
    Locked -> pure False
    Unlocked -> True <$ writeTVar tv Locked

-- | Release a lock. This version throws an exception
-- if the lock is unlocked.
release :: Lock -> IO ()
release (Lock tv) = atomically $
  readTVar tv >>= \case
    Unlocked -> throwSTM DoubleRelease
    Locked -> writeTVar tv Unlocked

data DoubleRelease = DoubleRelease deriving Show
instance Exception DoubleRelease where
  displayException ~DoubleRelease = "Attempted to release an unlocked lock."

-- | Release a lock. This version does nothing if
-- the lock is unlocked.
releaseIdempotent :: Lock -> IO ()
releaseIdempotent (Lock tv) = atomically $ writeTVar tv Unlocked

-- | Get the status of a lock.
isLocked :: Lock -> IO Status
isLocked (Lock tv) = readTVarIO tv

acquire/release pairs need careful masking and exception handling, much like primitive MVar operations. The documentation suggests, but does not actually state, that STM operations are interruptible when they retry; assuming this is true, the same approach used for withMVar will work here. Note: I've opened a GHC ticket to document retry interruptibility.

2
Kevin Zhu On

This is the example using global lock as mentioned by Petr.

import Control.Concurrent
import Control.Monad
import System.Random
import Control.Concurrent.MVar  (newMVar, takeMVar, putMVar, MVar)
import System.IO.Unsafe (unsafePerformIO)


{-# NOINLINE lock #-}
lock :: MVar ()
lock = unsafePerformIO $ newMVar ()



printer x = forkIO . forever $ do
   randomDelay 100000
   () <- takeMVar lock
   let atomicPutStrLn str =  putStrLn str >> putMVar lock ()
   atomicPutStrLn x

randomDelay t = randomRIO (0, t) >>= threadDelay



main = do
  printer "Hello"
  printer "World"
  return ()