conduit sink with leftover

209 views Asked by At

I have a sink and want to do some parsing with attoparsec in it. It happens that I get a Partial result. So I thought that I might just use leftover to put the insufficient content back upstream so it would come back with more appended later. But no new content gets appended as I had hoped. I would very much appreciate any suggestions on how to solve this. Thanks!

{-# LANGUAGE OverloadedStrings #-}

import Control.Monad.IO.Class (liftIO)
import Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Data.ByteString.Char8 as BS
import Data.Attoparsec.Char8


main = (CL.sourceList [BS.pack "foo", BS.pack "bar"]) $$ sink -- endless loop

-- this works:
-- main = (CL.sourceList [BS.pack "foobar"]) $$ sink

sink :: Sink BS.ByteString IO ()
sink = awaitForever $ \str -> do
                  liftIO $ putStrLn $ BS.unpack str -- debug, will print foo forever.
                  case (parse (string "foobar") str) of
                       Fail _ _ _ -> do
                                    liftIO $ putStr $ "f: " ++ BS.unpack str
                                    sink
                       Partial _ -> do
                                    leftover str
                                    sink
                       Done rest final -> do
                                          liftIO $ putStr $ "d: " ++ show final ++ " // " ++ show rest
                                          sink
2

There are 2 answers

0
Petr On BEST ANSWER

Keep in mind that Conduit has no concept of concatenating output. So what happens is:

  • The conduit gets a partial input.
  • It's not enough to parse.
  • You put it back as a leftover.
  • The conduit reads again the same you put back.
  • And this goes forever.

If you really want to pursue the direction of repeatedly trying the parser, you need to ensure that each time you put a leftover value back it's larger than the previous time. So you'd do something like this: If the parser doesn't finish, read additional input, concatenate it with the input you already have, push this back as a leftover and try again.

Note that the above procedure has complexity O(n^2), which will be particularly problematic if your parser succeeds after consuming a big block of data. If you'll be receiving one character at a time (which might happen) and the parser needs to consume 1000 characters, you'll get something like 500000 processing steps. So I'd strongly suggest using either the provided binding between Conduit and Attoparsec, or, if you want to do it yourself, properly use the continuation provided by Partial.

0
Paul Johnson On

The idea of "Partial" is that it returns you a continuation function; that is, once you have more input you call the continuation with that input. Trying to push the leftover lines back on to the input stream is wasteful at best, because you repeatedly parse the first bit of input.

You need to write your function to take a parser function as a parameter. Then your Partial case should read

Partial c -> sink c

That will cause "sink" to wait for more input and then hand it to the "c" function, which will continue parsing the new input from where it left off.