I am trying to adapt Store encoding and decoding for Streaming. Store already implements a streaming decode with a function called decodeMessageBS.
I tried to do a basic implementation of store deserialization for Streaming as below (without bracket for now to keep it simple). However, there seems to be something wrong with the logic for popper because decodeMessageBS keeps throwing PeekException:
{-# LANGUAGE RankNTypes #-}
import Streaming.Prelude as S hiding (print,show)
import Data.IORef
import Streaming as S
import qualified Data.ByteString as BS (ByteString,empty,length)
import System.IO.ByteBuffer
import Data.Store
import Data.Store.Streaming
streamDecode :: forall a. (Store a) => ByteBuffer -> Stream (Of BS.ByteString) IO () -> Stream (Of a) IO ()
streamDecode bb inp = do
ref <- lift $ newIORef inp
let popper = do
r <- S.uncons =<< readIORef ref
case r of
Nothing -> return Nothing
Just (a,rest) -> writeIORef ref rest >> return (Just a)
let go = do
r <- lift $ decodeMessageBS bb $ popper
lift $ print "Decoding"
case r of
Nothing -> return ()
Just msg -> (lift $ print "Message found") >> (S.yield . fromMessage $ msg) >> go
go
I can decode my test file fine with decodeIOPortionWith - so, the problem seems to be in the logic needed to feed decodeMessageBS. Will appreciate pointers on what is wrong with the logic of popper here.
The
PeekExceptionhappens becauseStoreuses a different format when saving messages in streaming mode, unlikeBinary. It expects a wrapper of typeMessagearoundStoredata when usingdecodeMessageBSfunction.decodeIOPortionWithdoesn't expectMessagewrapper and so, works fine with saved downStoredata. After I fixed the serialization to save down the data asMessageencoding,decodeMessageBSworked fine on that data.