Construct a pipes Proxy inside-out

184 views Asked by At

Is it possible to make a function so that a Proxy from pipes can be constructed inside-out? By inside-out, I mean create a proxy from a function that connects the upstream and downstream connections. The most desirable (but impossible) signature would be

makeProxy :: (Monad m) =>
             (Server a' a m r -> Client b' b m r -> Effect m r) ->
              Proxy  a' a               b' b               m r

The first problem we encounter is the mechanical problem of constructing the Proxy. There's no way for us to know if the function looks at the Server or Client except by having each of them be M, in which case we'll only know which one it looked at, not what value it tried to send upstream or downstream. If we focus on the upstream end, the only thing we know is that something tried to figure out what the upstream proxy is, so we need to decide on either always resulting in a Request farther upstream or Responding. Either way we answer, the only value we can provide is (). This means we can make a Request () to an upstream producer or Respond () immediately. If we consider making this choice for both ends, there are only four possible functions. The following functions are named after whether their upstream and downstream connections send interesting data downstream (D) or upstream (U).

betweenDD :: (Monad m) =>
             (Server () a m r -> Client () b m r -> Effect m r) ->
              Proxy  () a               () b               m r
betweenDD = undefined

betweenDU :: (Monad m) =>
             (Server () a m r -> Client b' () m r -> Effect m r) ->
              Proxy  () a               b' ()               m r
betweenDU = undefined

betweenUU :: (Monad m) =>
             (Server a' () m r -> Client b' () m r -> Effect m r) ->
              Proxy  a' ()               b' ()               m r
betweenUU f = reflect (betweenDD g)
    where g source sink = f (reflect sink) (reflect source)


betweenUD :: (Monad m) =>
             (Server a' () m r -> Client () b m r -> Effect m r) ->
              Proxy  a' ()               () b               m r
betweenUD = undefined

betweenDD is the most interesting, it would build a pipe between a Producer and a Consumer; betweenUU would do the same for a pipe running upstream. betweenDU would consume data requesting it from one of two sources. betweenUD would produce data, sending it to one of two destinations.

Can we provide a definition for betweenDD? If not, can we instead provide definitions for the following simpler functions?

belowD :: (Monad m) =>
          (Producer a m r -> Producer b m r) ->
           Proxy () a              () b m r

aboveD :: (Monad m) =>
          (Consumer b m r -> Consumer a m r) ->
           Proxy () a              () b m r

This question was motivated by trying to write belowD to use in answering a question about P.zipWith.

Example

This example happens to be essentially the question that inspired this question..

Let's say we want to create a Pipe that will number the values passing through it. The Pipe will have values a coming downstream from above and values (n, a) leaving downstream below; in other words it will be a Pipe a (n, a).

We'll solve this problem by zipping with the numbers. The result of ziping with the numbers is a function (->) from a Producer a to a Producer (n, a).

import qualified Pipes.Prelude as P

number' :: (Monad m, Num n, Enum n) => Producer a m () -> Producer (n, a) m ()
number' = P.zip (fromList [1..])

Even though the Pipe will consume as from upstream, from the point of view of the function it needs a Producer of as to provide those values. If we had a definition for belowD we could write

number :: (Monad m, Num n, Enum n) => Pipe a (n, a) m ()
number = belowD (P.zip (fromList [1..]))

given a suitable definition for fromList

fromList :: (Monad m) => [a] -> Producer a m ()
fromList []     = return ()
fromList (x:xs) = do
    yield x
    fromList xs
2

There are 2 answers

2
Gabriella Gonzalez On BEST ANSWER

Actually, I think makeProxy is possible if you slightly change the type. I am on my phone so I cannot type check this just yet, but I believe this works:

{-# LANGUAGE RankNTypes #-}

import Control.Monad.Trans.Class (lift)
import Pipes.Core

makeProxy
    ::  Monad m
    =>  (   forall n. Monad n
        =>  (a' -> Server a' a n r)
        ->  (b  -> Client b' b n r)
        ->         Effect      n r
        )
    ->  Proxy a' a b' b m r
makeProxy k = runEffect (k up dn)
  where
    up = lift . request \>\ pull
    dn = push />/ lift . respond

This assumes that k is defined as:

k up dn = up ->> k >>~ dn

Edit: Yeah, it works if you add an import for lift

I'll walk through why this works.

First, let me set out some of the pipes definitions and laws:

-- Definition of `push` and `pull`
(1) pull = request >=> push
(2) push = respond >=> pull

-- Read this as: f * (g + h) = (f * g) + (f * h)
(3) f \>\ (g >=> h) = (f \>\ g) >=> (f \>\ h)

-- Read this as: (g + h) * f = (g * f) + (h * f)
(4) (g >=> h) />/ f = (g />/ f) >=> (h />/ f)

-- Right identity law for the request category
(5) f \>\ request = f

-- Left identity law for the respond category
(6) respond />/ f = f

-- Free theorems (equations you can prove from the types alone!)
(7) f \>\ respond = respond
(8) request />/ f = request

Now let's use those equations to expand out up and dn:

up = (lift . request) \>\ pull
   = (lift . request) \>\ (request >=> push)  -- Equation (1)
   = (lift . request \>\ request) >=> (lift . request \>\ push)  -- Equation (3)
   = lift . request >=> (lift . request \>\ push)                -- Equation (5)
   = lift . request >=> (lift . request \>\ (respond >=> pull))  -- Equation (2)
   = lift . request >=> (lift . request \>\ respond) >=> (lift . request \>\ pull) -- Equation (3)
   = lift . request >=> respond >=> (lift . request \>\ pull)    -- Equation (7)
up = lift . request >=> respond >=> up

-- Same steps, except symmetric
dn = lift . respond >=> request >=> dn

In other words, up converts all requests going out of k's upstream interface into lift . request and dn converts all responds going out of k's downstream interface into lift . respond. In fact, we can prove that:

(9)  (f \>\ pull) ->> p = f \>\ p
(10) p >>~ (push />/ f) = p />/ f

... and if we apply those equations to k, we get:

  (lift . request \>\ pull) ->> k >>~ (push />/ lift . respond)
= lift . request \>\ k />/ lift . respond

This says the same thing except more directly: we're replacing every request in k with lift . request and replacing every respond in k with lift . respond.

Once we lower all requests and responds to the base monad, we end up with this type:

lift . request \>\ k />/ lift . respond :: Effect' (Proxy a' a b' b m) r

Now we can delete the outer Effect using runEffect. This leaves behind the "inside-out" Proxy.

This is also the same trick that Pipes.Lift.distribute uses to swap the order of the Proxy monad with the monad underneath it:

http://hackage.haskell.org/package/pipes-4.1.4/docs/src/Pipes-Lift.html#distribute

0
Sassa NF On

(Sorry, I missed a couple brackets on a sleepy head, so the first answer was to a different question)

Producer' a m r -> Producer' b m r is the definition of a Pipe a b m r - it can consume a and produce b.

belowD ::Monad m => (Producer' a m () -> Producer' b m r) -> Pipe a b m ()
belowD g = sequence_ $ repeat $ do
             x <- await -- wait for `a` as a Pipe
             g $ yield x -- pass a trivial Producer to g, and forward output

This one will expect one or more b for each a. If g needs more than one a to produce one b, it won't produce anything.


But then since Proxy a b c d m is a Monad, we can lift await:

belowD :: Monad m => (forall m . Monad m => Producer a m () -> Producer b m r) ->
                     Pipe a b m r
belowD g = h . g $ sequence_ $ repeat ((lift $ await) >>= yield) where
  h :: Monad m => Producer b (Pipe a b m) r -> Pipe a b m r
  h p = do
      x <- next p
      case x of
        Left r -> return r
        Right (x,p) -> do
                         yield x
                         h p

h :: Monad m => Producer a m () -> Producer a m ()
h :: Monad m => Producer a m () -> Producer a m ()
h p = p >-> (sequence_ $ repeat $ await >>= yield >> await) -- skips even

main = runEffect $ (mapM_ yield [1..10]) >-> (for (belowD h) $ lift . print)

> 1
> 3
> 5
> 7
> 9