I want to implement a pipeline between two threads. I have thread A that take the data, process it, and send it to thread B. I have a MVar that check if the data is completely processed
However, I'm having an exception *** Exception: thread blocked indefinitely in an STM transaction
Why are my threads blocked? I though than when the first thread write on the channel, then when there is a data on the channel, the second one can read it
fstPipe :: (a -> b) -> TChan b -> MVar () -> [a] -> IO ()
fstPipe f chIn m xs = do
( mapM_(\x-> atomically $ writeTChan chIn $ f x) xs) >> putMVar m ()
pipelineDone channel mIn = do
isDone <- fmap isJust $ tryTakeMVar mIn
isEmpty <- atomically $ isEmptyTChan channel
return $ isDone && isEmpty
lastPipe f chIn mIn = iter
where iter = do
atomically $ fmap f $ readTChan chIn
isDone <- pipelineDone chIn mIn
unless isDone $ iter
pipeline = do
chIn <- atomically newTChan
m <- newEmptyMVar
first <- async $ fstPipe reverse chIn m $ replicate 10 [1..500]
last <- async $ lastPipe print chIn m
wait first
wait last
It seems odd to me to be using STM and semaphores in the same code block... Why not do the entire thing in STM?
In particular, why not a
TChan (Maybe x)
, withNothing
indicating the end of the sequence?Also, notice that your
fstPipe
likely just generates a bunch of unevaluated thunks and immediately chucks them into theTChan
, without actually computing anything. You probably want aseq
or similar in there to force some actual work to happen on that thread.