In Haskell, how can I abort a calculation when a web client disconnects

524 views Asked by At

I have a Haskell-based web service that performs a calculation that for some input can take a really long time to finish. ("really long" here means over a minute)

Because performing that calculation takes all the CPU available on the server, I place incoming requests in a queue (well, actually a stack for reasons that have to do with the typical client, but that's besides the point) when they arrive and service them when the currently running calculation finishes.

My problem is that the clients don't always wait long enough, and sometimes time out on their end, disconnect, and try a different server (well, they try again and hit the elb, and usually get a different instance). Also, occasionally the calculation the web client was asking for will become obsolete because of external factors and the web client will be killed.

In those cases I'd really like to be able to detect that the web client has gone away before I pull the next request off the stack and start the (expensive) calculation. Unfortunately, my experience with snap leads me to believe that there's no way in that framework to ask "is the client's TCP connection still connected?" and I haven't found any documentation for other web frameworks that cover the "client disconnected" case.

So is there a Haskell web framework that makes it easy to detect whether a web client has disconnected? Or failing that, is there one that at least makes it possible?

(I understand that it may not be possible to be absolutely certain in all cases whether a TCP client is still there without sending data to the other end; however, when the client actually sends RST packets to the server and the server's framework doesn't let the application code determine that the connection is gone, that's a problem)


Incidentally, though one might suspect that warp's onClose handler would let you do this, this fires only when a response is ready and written to the client so is useless as a way of aborting a calculation in progress. There also seems to be no way to get access to the accepted socket so as to set SO_KEEPALIVE or similar. (There are ways to access the initial listening socket, but not the accepted one)

2

There are 2 answers

0
Daniel Martin On BEST ANSWER

So I found an answer that works for me and it might work for someone else.

It turns out that you can in fact mess around enough with the internals of Warp to do this, but then what you're left with is a basic version of Warp and if you need things like logging, etc., will need to add other packages on to that.

Also, note that so-called "half-closed" connections (when the client closes their sending end, but is still waiting for data) will be detected as closed, interrupting your calculation. I don't know of any HTTP clients that deal in half-closed connections, but just something to be aware of.

Anyway, what I did was first copy the functions runSettings and runSettingsSocket exposed by Network.Wai.Handler.Warp and Network.Wai.Handler.Warp.Internal and made versions that called a function I supplied instead of WarpI.socketConnection, so that I have the signature:

runSettings' :: Warp.Settings -> (Socket -> IO (IO WarpI.Connection))
             -> Wai.Application -> IO ()

This required copying out a few helper methods, like setSocketCloseOnExec and windowsThreadBlockHack. The double-IO signature there might look weird, but it's what you want - the outer IO is run in the main thread (that calls accept) and the inner IO is run in the per-connection thread that is forked after accept returns. The original Warp function runSettings is equivalent to:

\set -> runSettings' set (WarpI.socketConnection >=> return . return)

Then I did:

data ClientDisappeared = ClientDisappeared deriving (Show, Eq, Enum, Ord)
instance Exception ClientDisappeared

runSettingsSignalDisconnect :: Warp.Settings -> Wai.Application -> IO ()
runSettingsSignalDisconnect set =
  runSettings' set (WarpI.socketConnection >=> return . wrapConn)
  where
    -- Fork a 'monitor' thread that does nothing but attempt to
    -- perform a read from conn in a loop 1/sec, and wrap the receive
    -- methods on conn so that they first consume from the stuff read
    -- by the monitoring thread. If the monitoring thread sees
    -- end-of-file (signaled by an empty string read), raise
    -- ClientDisappered on the per-connection thread.
    wrapConn conn = do
      tid <- myThreadId
      nxtBstr <- newEmptyMVar :: IO (MVar ByteString)
      semaphore <- newMVar ()
      readerCount <- newIORef (0 :: Int)
      monitorThread <- forkIO (monitor tid nxtBstr semaphore readerCount)
      return $ conn {
        WarpI.connClose = throwTo monitorThread ClientDisappeared
                          >> WarpI.connClose conn
        , WarpI.connRecv = newRecv nxtBstr semaphore readerCount
        , WarpI.connRecvBuf = newRecvBuf nxtBstr semaphore readerCount
        }
      where
        newRecv :: MVar ByteString -> MVar () -> IORef Int
                -> IO ByteString
        newRecv nxtBstr sem readerCount =
          bracket_
          (atomicModifyIORef' readerCount $ \x -> (succ x, ()))
          (atomicModifyIORef' readerCount $ \x -> (pred x, ()))
          (withMVar sem $ \_ -> do w <- tryTakeMVar nxtBstr
                                   case w of
                                     Just w' -> return w'
                                     Nothing -> WarpI.connRecv conn
          )

        newRecvBuf :: MVar ByteString -> MVar () -> IORef Int
                   -> WarpI.Buffer -> WarpI.BufSize -> IO Bool
        newRecvBuf nxtBstr sem readerCount buf bufSize =
          bracket_
          (atomicModifyIORef' readerCount $ \x -> (succ x, ()))
          (atomicModifyIORef' readerCount $ \x -> (pred x, ()))
          (withMVar sem $ \_ -> do
              (fulfilled, buf', bufSize') <-
                if bufSize == 0 then return (False, buf, bufSize)
                else
                  do w <- tryTakeMVar nxtBstr
                     case w of
                       Nothing -> return (False, buf, bufSize)
                       Just w' -> do
                         let wlen = B.length w'
                         if wlen > bufSize
                           then do BU.unsafeUseAsCString w' $ \cw' ->
                                     copyBytes buf (castPtr cw') bufSize
                                   putMVar nxtBstr (B.drop bufSize w')
                                   return (True, buf, 0)
                           else do BU.unsafeUseAsCString w' $ \cw' ->
                                     copyBytes buf (castPtr cw') wlen
                                   return (wlen == bufSize, plusPtr buf wlen,
                                           bufSize - wlen)
              if fulfilled then return True
                else WarpI.connRecvBuf conn buf' bufSize'
          )
        dropClientDisappeared :: ClientDisappeared -> IO ()
        dropClientDisappeared _ = return ()
        monitor tid nxtBstr sem st =
          catch (monitor' tid nxtBstr sem st) dropClientDisappeared

        monitor' tid nxtBstr sem st = do
          (hitEOF, readerCount) <- withMVar sem $ \_ -> do
            w <- tryTakeMVar nxtBstr
            case w of
              -- No one picked up our bytestring from last time
              Just w' -> putMVar nxtBstr w' >> return (False, 0)
              Nothing -> do
                w <- WarpI.connRecv conn
                putMVar nxtBstr w
                readerCount <- readIORef st
                return (B.null w, readerCount)
          if hitEOF && (readerCount == 0)
            -- Don't signal if main thread is also trying to read -
            -- in that case, main thread will see EOF directly
            then throwTo tid ClientDisappeared
            else do threadDelay oneSecondInMicros
                    monitor' tid nxtBstr sem st
        oneSecondInMicros = 1000000
2
Mark Seemann On

Assuming that 'web service' means HTTP(S)-based clients, one option is to use a RESTful approach. Instead of assuming that clients are going to stay connected, the service could accept the request and return 202 Accepted. As the HTTP status code specification outlines:

The request has been accepted for processing, but the processing has not been completed [...]

The 202 response is intentionally non-committal. Its purpose is to allow a server to accept a request for some other process (perhaps a batch-oriented process that is only run once per day) without requiring that the user agent's connection to the server persist until the process is completed. The entity returned with this response SHOULD include an indication of the request's current status and either a pointer to a status monitor or some estimate of when the user can expect the request to be fulfilled.

The server immediately responds with a 202 Accepted response, and also includes a URL that the client can use to poll for status. One option is to put this URL in the response's Location header, but you can also put the URL in a link in the response's body.

The client can poll the status URL for status. Once the calculation finishes, the status resource can provide a link to the finished result.

You can add cache headers to the status resource and final result if you're concerned that the clients will be polling too hard.

REST in Practice outlines the general concepts, while the RESTful Web Services Cookbook has lots of good details.

I'm not saying that you can't do something with either HTTP or TCP/IP (I don't know), but if you can't, then the above is a tried-and-true solution to similar problems.

Obviously, this is completely independent on programming language, but it's been my experience that REST and algebraic data types go well together.