Pipeline-like operation using TChan

137 views Asked by At

I want to implement a pipeline between two threads. I have thread A that take the data, process it, and send it to thread B. I have a MVar that check if the data is completely processed

However, I'm having an exception *** Exception: thread blocked indefinitely in an STM transaction

Why are my threads blocked? I though than when the first thread write on the channel, then when there is a data on the channel, the second one can read it

fstPipe :: (a -> b) -> TChan b -> MVar () -> [a] -> IO ()
fstPipe f chIn m xs = do
    ( mapM_(\x-> atomically $ writeTChan chIn $ f x) xs) >> putMVar m ()

pipelineDone channel mIn = do
    isDone <- fmap isJust $ tryTakeMVar mIn
    isEmpty <- atomically $ isEmptyTChan channel
    return $ isDone && isEmpty

lastPipe f chIn mIn = iter 
    where iter = do
        atomically $ fmap f $ readTChan chIn
        isDone <- pipelineDone chIn mIn
        unless isDone $ iter

pipeline = do
    chIn <- atomically newTChan
    m <- newEmptyMVar
    first <- async $ fstPipe reverse chIn m $ replicate 10 [1..500]
    last <- async $ lastPipe print chIn m
    wait first
    wait last
3

There are 3 answers

2
MathematicalOrchid On BEST ANSWER

It seems odd to me to be using STM and semaphores in the same code block... Why not do the entire thing in STM?

In particular, why not a TChan (Maybe x), with Nothing indicating the end of the sequence?

Also, notice that your fstPipe likely just generates a bunch of unevaluated thunks and immediately chucks them into the TChan, without actually computing anything. You probably want a seq or similar in there to force some actual work to happen on that thread.

0
chi On

I think there's a race condition:

  • stop fstPipe just before the putMVar
  • advance lastPipe to read everything, and then call pipelineDone
  • pipelineDone returns False since putMVar was not yet done
  • lastPipe will try to read from the channel
  • putMVar executes, but it's too late

Now lastPipe is stuck reading on an empty channel.

0
Matt On

Your problem is in the logic of pipelineDone. Currently, you have:

pipelineDone channel mIn = do
  isDone <- fmap isJust $ tryTakeMVar mIn
  isEmpty <- atomically $ isEmptyTChan channel
  return $ isDone && isEmpty

tryTakeMVar is going to take the contents of the MVar assuming there is something in there. Assuming your producer finishes first, it is going to write () into the MVar. Your consumer is then going to try and take the contents of it. If it succeeds, then the MVar goes empty. Any subsequent tryTakeMVar will always return Nothing, thus isDone && isEmpty will always return false and you will keep trying to read from the TChan. Once the TChan goes empty, GHC can tell you that it has encountered a deadlock.

You should instead change your pipelineDone implementation to:

pipelineDone channel mIn = do
  stillRunning <- isEmptyMVar mIn
  isEmpty <- atomically $ isEmptyTChan channel
  return $ (not stillRunning) && isEmpty

This will instead simply poll the MVar, instead of actually emptying it.