How to use Web.Scotty stream with ResourceT?

129 views Asked by At

Having touched Haskell since 2013, I am writing a small Web.Scotty service to manage S3 bucket (with Amazonka-2.0).

The Web.Scotty part and Amazonka was pretty clear, but I am not sure how to make it work together:

main :: IO ()
main = do
    env <- Amazonka.newEnv Amazonka.discover
    scotty 3000 (app env)

app :: Amazonka.Env -> ScottyM ()
app env = do
    get "/stream-file" $ do
        runResourceT $ do
            resp <- runResourceT $ Amazonka.send env (newGetObject "bucket" "file")

            (resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . print))

            lift $ stream $ \send flush -> do
                (resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . send) >> liftIO flush)

I tried removing runResourceT in here, without any change:

resp <- Amazonka.send env (newGetObject "bucket" "file")

This works and prints to console successfully:

(resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . print))

This doesn't work (if print section is commented out) with an error:

lift $ stream $ \send flush -> do
    (resp ^. getObjectResponse_body) `sinkBody` (CC.map fromByteString .| CC.mapM_ (liftIO . send) >> liftIO flush)

Error:

HttpExceptionRequest Request {
  host                 = "bucket.s3.us-east-1.amazonaws.com"
  port                 = 443
  secure               = True
  requestHeaders       = [("X-Amz-Content-SHA256",""),("X-Amz-Date",""),("Host","bucket.s3.us-east-1.amazonaws.com"),("Authorization","<REDACTED>")]
  path                 = "/file"
  queryString          = ""
  method               = "GET"
  proxy                = Nothing
  rawBody              = False
  redirectCount        = 0
  responseTimeout      = ResponseTimeoutMicro 70000000
  requestVersion       = HTTP/1.1
  proxySecureMode      = ProxySecureWithConnect
}
 ConnectionClosed

What am I missing?

2

There are 2 answers

0
fghibellini On

If you try:

{-# LANGUAGE OverloadedStrings #-}

module Main where

import Data.Binary.Builder (fromByteString)
import Web.Scotty
import Web.Scotty
import Data.Conduit ((.|), ConduitT, yield, runConduit)
import qualified Data.Conduit.Combinators as CC
import Control.Monad.IO.Class
import Control.Lens
import Control.Monad.Trans.Class (lift)
import Control.Concurrent (threadDelay)
import Data.ByteString (ByteString)

import Data.IORef

slowSource :: MonadIO m => IORef Bool -> ConduitT a ByteString m ()
slowSource state = do
  x <- liftIO $ readIORef state
  yield ("state: " <> (if x then "T" else "F") <> "\n")
  liftIO $ threadDelay 1000000
  slowSource state

main :: IO ()
main = do
    state <- newIORef False
    scotty 3000 (app state)

app :: IORef Bool -> ScottyM ()
app state = do
    get "/stream-file" $ do
      liftIO $ writeIORef state True

      stream $ \send flush -> do
          runConduit $ slowSource state .| CC.map fromByteString .| CC.mapM_ (\chunk -> liftIO (send chunk >> flush))

      liftIO $ writeIORef state False

you'll see:

curl http://localhost:3000/stream-file
state: F
state: F
state: F
state: F
state: F
^C

which shows that stream indeed only "sets up" the pipeline, but it's actually executed after the handler completes i.e. after your resources are deallocated (in your case the connection to AWS).

0
K. A. Buhr On

It looks like Amazonka requires that the ResourceT in which the Amazonka.send action is executed remain open until the body conduit is actually streamed. This is kind-of, sort-of, half-documented in the Amazonka.Response module.

In your code, the stream call sets the streaming action, but doesn't actually execute sinkBody, so the outer ResourceT wraps up and allows the connection to be closed before Scotty invokes the streaming action including execution of the sinkBody.

It seems both safest and easiest to run a single ResourceT in your Scotty server that's opened when the server starts and only closed when the server is terminated. (I was worried that this might leak connections, but Amazonka appears to engage in enough connection management that this isn't an issue.)

To do this without giving the Scotty package major brain surgery, you can define the following function that allows you to "unlift" the ResourceT transformer -- basically, to do everything in IO with an "escape hatch" to a single shared ResourceT:

runWithResourceT :: ((forall m a. (MonadIO m) => ResourceT IO a -> m a) -> IO b) -> IO b
runWithResourceT act = runResourceT $ withRunInIO $ \runInIO -> act (liftIO . runInIO)

Armed with this function, you can run your application in a single active ResourceT context like so:

main :: IO ()
main = do
  ...
  runWithResourceT $ \withResourceT -> scotty 3000 (app env withResourceT)

where the app runs in a normal IO-based ScottyM monad, using withResourceT where needed. I've avoided sinkBody here because it invokes its own fresh runResourceT via runConduitRes. Instead, I run the body conduit manually using withResourceT:

app :: Amazonka.Env -> (forall m a. (MonadIO m) => ResourceT IO a -> m a) -> ScottyM ()
app env withResourceT = get "/stream-file" $ do
  resp <- withResourceT $ Amazonka.send env (newGetObject "bucket" "file")
  stream $ \send flush -> do
    withResourceT $ runConduit $
      (resp ^. getObjectResponse_body._ResponseBody)
      .| mapC fromByteString
      .| mapM_C (liftIO . send)
    flush

Here's my full program. I tested it, it seems to work. Connection are sometimes held open for a bit (say, 30 seconds or so), but they're eventually closed, so it doesn't seem to be leaking anything.

{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}

module Main where

import Amazonka
import Amazonka.S3
import Amazonka.S3.Lens
import Conduit
import Control.Lens
import Data.Binary.Builder
import System.IO
import Web.Scotty

runWithResourceT :: ((forall m a. (MonadIO m) => ResourceT IO a -> m a) -> IO b) -> IO b
runWithResourceT act = runResourceT $ withRunInIO $ \runInIO -> act (liftIO . runInIO)

main :: IO ()
main = do
  logger <- newLogger Debug stdout
  discover <- newEnv Amazonka.discover
  let env = discover
        { Amazonka.logger = logger
        , Amazonka.region = Amazonka.Ohio
        }
  runWithResourceT $ \withResourceT -> scotty 3000 (app env withResourceT)

app :: Amazonka.Env -> (forall m a. (MonadIO m) => ResourceT IO a -> m a) -> ScottyM ()
app env withResourceT = get "/stream-file" $ do
  resp <- withResourceT $ Amazonka.send env (newGetObject "bucket" "file")
  stream $ \send flush -> do
    withResourceT $ runConduit $
      (resp ^. getObjectResponse_body._ResponseBody)
      .| mapC fromByteString
      .| mapM_C (liftIO . send)
    flush