Say I have simple producer/consumer model where the consumer wants to pass back some state to the producer. For instance, let the downstream-flowing objects be objects we want to write to a file and the upstream objects be some token representing where the object was written in the file (e.g. an offset).
These two processes might look something like this (with pipes-4.0
),
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
import Pipes
import Pipes.Core
import Control.Monad.Trans.State
import Control.Monad
newtype Object = Obj Int
deriving (Show)
newtype ObjectId = ObjId Int
deriving (Show, Num)
writeObjects :: Proxy ObjectId Object () X IO r
writeObjects = evalStateT (forever go) (ObjId 0)
where go = do i <- get
obj <- lift $ request i
lift $ lift $ putStrLn $ "Wrote "++show obj
modify (+1)
produceObjects :: [Object] -> Proxy X () ObjectId Object IO ()
produceObjects = go
where go [] = return ()
go (obj:rest) = do
lift $ putStrLn $ "Producing "++show obj
objId <- respond obj
lift $ putStrLn $ "Object "++show obj++" has ID "++show objId
go rest
objects = [ Obj i | i <- [0..10] ]
Simple as this might be, I've had a fair bit of difficulty reasoning about how to compose them. Ideally, we'd want a push-based flow of control like the following,
writeObjects
starts by blocking onrequest
, having sent the initialObjId 0
upstream.produceObjects
sends the first object,Obj 0
, downstreamwriteObjects
writes the object and increments its state, and waits onrequest
, this time sendingObjId 1
upstreamrespond
inproduceObjects
returns withObjId 0
produceObjects
continues at Step (2) with the second object,Obj 1
My initial attempt was with push-based composition as follows,
main = void $ run $ produceObjects objects >>~ const writeObjects
Note the use of const
to work around the otherwise incompatible types (this is likely where the problem lies). In this case, however, we find that ObjId 0
gets eaten,
Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 1
Producing Obj 1
...
A pull-based approach,
main = void $ run $ const (produceObjects objects) +>> writeObjects
suffers a similar issue, this time dropping Obj 0
.
How might one go about composing these pieces in the desired manner?
The choice of which composition to use depends on which component should initiate the entire process. If you want the downstream pipe to initiate the process then you want to use pull-based composition (i.e.
(>+>)
/(+>>)
) but if you want the upstream pipe to initiate the process then you should use push-based composition (i.e.(>>~)
/(>~>)
). The type errors you got were actually warning you that there is a logical error in your code: you haven't clearly established which component initiates the process first.From your description, it's obvious that you want control flow to begin from
produceObjects
so you want to use push-based composition. Once you use push-based composition, the type of the composition operator will tell you everything you need to know about how to fix your code. I'll take its type and specialize it to your composition chain:As you already noticed, the type error you got when you tried to use
(>>~)
told you that you were missing an argument of typeObject
to yourwriteObjects
function. This statically enforces that you cannot run any code inwriteObjects
before receiving your firstObject
(through the initial argument).The solution is to rewrite your
writeObjects
function like this:This then gives the correct behavior:
You might wonder why this requirement that one of the two pipes takes an initial argument makes sense, other than the abstract justification that this is what the category laws require. The plain English explanation is that the alternative is that you would need buffer the first transmitted
Object
"in between" the two pipes beforewriteObjects
reached its firstrequest
statement. This approach produces a lot of problematic behavior and buggy corner cases, but probably the most significant problem is that pipe composition would no longer be associative and the order of effects would change based on the order in which you composed things.The nice thing about the bidirectional pipe composition operators is that the types work out so that you can always deduce whether or not a component is "active" (i.e. initiates control) or "passive" (i.e. waits for input) purely by studying the type. If composition says that a certain pipe (like
writeObjects
) must take an argument, then it's passive. If it takes no argument (likeproduceObjects
), then it's active and initiates control. So composition forces you to have at most one active pipe within your pipeline (the pipe that doesn't take an initial argument) and that's the pipe that begins control.