Streaming xml-conduit parse results

1.5k views Asked by At

I want to use xml-conduit, specifically Text.XML.Stream.Parse in order to lazily extract a list of objects from a large XML file.

As a test case, I use the recently re-released StackOverflow data dumps. To keep it simple, I intend to extract all usernames from stackoverflow.com-Users.7z. Even if the file is a .7z, file says it is just bzip2-compressed data (there might be some 7zip stuff at the end of the file, but right now I don't care).

A simplified version of the XML would be

<users>
    <row id="1" DisplayName="StackOverflow"/>
    ...
    <row id="2597135" DisplayName="Uli Köhler"/>
    ... 
</users>

Based on this previous Q&A and the example on Hackage stream-reading the example XML in bz2-ed form works perfectly for me

However, when using runghc to run the following program, it runs without printing any output:

{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit (runResourceT, ($$), ($=))
import qualified Data.Conduit.Binary as CB
import Data.Conduit.BZlib
import Data.Conduit
import Data.Text (Text)
import System.IO
import Text.XML.Stream.Parse
import Control.Applicative ((<*))

data User = User {name :: Text} deriving (Show)

parseUserRow = tagName "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do
    return $ User displayName

parseUsers = tagNoAttr "users" $ many parseUserRow

main = do
    users <- runResourceT $ CB.sourceFile "stackoverflow.com-Users.7z" $= bunzip2 $= parseBytes def $$ force "users required" parseUsers
    putStrLn $ unlines $ map show users

I assume this issue occurs because Haskell tries to deeply evaluate the users list before starting to print it. This theory is supported by the memory usage of the program continually growing about 2 percent per second (source: htop).

How can I "live-stream" the results to stdout? I assume this is possible by adding another conduit statement like $$ CB.sinkFile "output.txt" at the end. This specific version however expects a Conduit output of ByteString. Could you point me in the right direction where to go from here?

Any help will be appreciated!

3

There are 3 answers

1
pbarill On BEST ANSWER

Made an edit to bring the insightful example from M. Snoyman up to date, but it was tossed by mediocre power-trippers. Therefore, this.

The original won't compile anymore and produces many deprecated warnings (legacy syntax).

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
import           Control.Applicative    ((<*))
import           Control.Concurrent     (threadDelay)
import           Control.Monad          (forever, void)
import           Control.Monad.Catch    (MonadThrow)
import           Control.Monad.IO.Class (MonadIO (liftIO))
import           Data.ByteString        (ByteString)
import           Data.Conduit
import qualified Data.Conduit.List      as CL
import           Data.Text              (Text)
import           Data.Text.Encoding     (encodeUtf8)
import           Data.XML.Types         (Event)
import           Text.XML.Stream.Parse

-- instead of actually including a large input data file, just for testing purposes
infiniteInput :: MonadIO m => ConduitT () ByteString m ()
infiniteInput = do
    yield "<users>"
    forever $ do
        yield $ encodeUtf8
            "<row id=\"1\" DisplayName=\"StackOverflow\"/><row id=\"2597135\" DisplayName=\"Uli Köhler\"/>"
        liftIO $ threadDelay 1000000
    --yield "</users>" -- will never be reached

data User = User {name :: Text} deriving (Show)

parseUserRow :: MonadThrow m => forall o. ConduitT Event o m (Maybe User)
parseUserRow = tag' "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do
    return $ User displayName

parseUsers :: MonadThrow m => ConduitT Event User m ()
parseUsers = void $ tagNoAttr "users" $ manyYield parseUserRow

--or use manyYield, now provided by Text.XML.Stream.Parse
yieldWhileJust :: Monad m
               => ConduitT a b m (Maybe b)
               -> ConduitT a b m ()
yieldWhileJust consumer =
    loop
  where
    loop = do
        mx <- consumer
        case mx of
            Nothing -> return ()
            Just x -> yield x >> loop

main :: IO ()
main = runConduit $ infiniteInput
    .| parseBytes def
    .| parseUsers
    .| CL.mapM_ print

ghc 8.6.5, xml-conduit 1.9.0.0

2
Michael Snoyman On

Let me start by saying that the streaming helper API in xml-conduit has not be worked on in years, and could probably benefit from a reimagining given changes that have happened to conduit in the interim. I think there are likely much better ways to accomplish things.

That said, let me explain the problem you're seeing. The many function creates a list of results, and will not produce any values until it has finished processing. In your case, there are so many values that this appears to never happen. Ultimately, when the entire file has been read, the entire list of users will be displayed at once. But that's clearly not the behavior you're looking for.

Instead, what you want to do is create a stream of User values which are produced as soon as they're ready. What you want to do is basically replace the many function call with a new function which will yield a result each time it's parsed. A simple implementation of this could be:

yieldWhileJust :: Monad m
               => ConduitM a b m (Maybe b)
               -> Conduit a m b
yieldWhileJust consumer =
    loop
  where
    loop = do
        mx <- consumer
        case mx of
            Nothing -> return ()
            Just x -> yield x >> loop

Also, instead of using putStrLn $ unlines $ map show, you want to attach the entire pipeline to a consumer which will print each individually yielded User value. This can be implemented easily with Data.Conduit.List.mapM_, e.g.: CL.mapM_ (liftIO . print).

I've put together a full example based on your code. The input is an artificially generated infinite XML file, just to prove the point that it really is yielding output immediately.

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
import           Control.Applicative    ((<*))
import           Control.Concurrent     (threadDelay)
import           Control.Monad          (forever, void)
import           Control.Monad.IO.Class (MonadIO (liftIO))
import           Data.ByteString        (ByteString)
import           Data.Conduit
import qualified Data.Conduit.List      as CL
import           Data.Text              (Text)
import           Data.Text.Encoding     (encodeUtf8)
import           Data.XML.Types         (Event)
import           Text.XML.Stream.Parse

-- instead of actually including a large input data file, just for testing purposes
infiniteInput :: MonadIO m => Source m ByteString
infiniteInput = do
    yield "<users>"
    forever $ do
        yield $ encodeUtf8
            "<row id=\"1\" DisplayName=\"StackOverflow\"/><row id=\"2597135\" DisplayName=\"Uli Köhler\"/>"
        liftIO $ threadDelay 1000000
    --yield "</users>" -- will never be reached

data User = User {name :: Text} deriving (Show)

parseUserRow :: MonadThrow m => Consumer Event m (Maybe User)
parseUserRow = tagName "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do
    return $ User displayName

parseUsers :: MonadThrow m => Conduit Event m User
parseUsers = void $ tagNoAttr "users" $ yieldWhileJust parseUserRow

yieldWhileJust :: Monad m
               => ConduitM a b m (Maybe b)
               -> Conduit a m b
yieldWhileJust consumer =
    loop
  where
    loop = do
        mx <- consumer
        case mx of
            Nothing -> return ()
            Just x -> yield x >> loop

main :: IO ()
main = infiniteInput
    $$ parseBytes def
    =$ parseUsers
    =$ CL.mapM_ print
0
Uli Köhler On

Based on Michael Snoyman's excellent answer here is a modified version that reads the data from stackoverflow.com-Users.7z instead of sourcing it from an artificially generated IO stream.

For a reference on how to use xml-conduit directly, please see Michael's answer. This answer is only provided as an example on how to use the method described there on optionally compressed files.

The main change here is that you need to use runResourceT to read the file, and the final print needs to be lifted from IO () to ResourceT IO ()

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
import qualified Data.Conduit.Binary as CB
import           Control.Applicative    ((<*))
import           Control.Concurrent     (threadDelay)
import           Control.Monad          (forever, void)
import           Control.Monad.IO.Class (MonadIO (liftIO))
import           Data.ByteString        (ByteString)
import qualified Data.ByteString.Lazy as LB
import           Data.Conduit
import qualified Data.Conduit.List      as CL
import           Data.Text              (Text)
import           Data.Text.Encoding     (encodeUtf8)
import           Data.XML.Types         (Event)
import           Text.XML.Stream.Parse
import           Data.Conduit.BZlib (bunzip2)
import           Control.Monad.Trans.Class (lift)
import           Control.Monad.Trans.Resource (MonadThrow, runResourceT)

data User = User {name :: Text} deriving (Show)

parseUserRow :: MonadThrow m => Consumer Event m (Maybe User)
parseUserRow = tagName "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do
    return $ User displayName

parseUsers :: MonadThrow m => Conduit Event m User
parseUsers = void $ tagNoAttr "users" $ yieldWhileJust parseUserRow

yieldWhileJust :: Monad m
               => ConduitM a b m (Maybe b)
               -> Conduit a m b
yieldWhileJust consumer =
    loop
  where
    loop = do
        mx <- consumer
        case mx of
            Nothing -> return ()
            Just x -> yield x >> loop

main :: IO ()
main = runResourceT $ CB.sourceFile "stackoverflow.com-Users.7z" $= bunzip2 $$ parseBytes def
    =$ parseUsers
    =$ CL.mapM_ (lift . print)