Problems with Incompletely Consumed Akka Streams

201 views Asked by At

We have a service that uses Alpakka 3.0.4 (Scala 2.13) to stream multiple large files from S3, zip them, and send the zipped stream out as HTTP responses. The idea is to start sending out even before zipping is complete, to start zipping even before the files have all arrived, etc.--all carefully managed in terms of backpressure, which is what Akka Streams excel at.

The HTTP server, by the way, is Aleph, because the code base is actually Clojure, and we call Scala from Clojure (not a problem at all--much easier, in fact, than calling Java from Clojure). Aleph supports streaming HTTP responses in chunks, which is in fact its default mode.

Everything works fine... if the HTTP responses are fully consumed. The body of a response is the zipped input stream. Once it's fully consumed by the client, the stream is closed and Akka does its trick to close all the other streams, including the original S3 input streams.

But occasionally, the client closes connection prematurely (still investigating why), before the zipped stream is fully consumed. The end user just ends up with an empty zip file. What we observe on the server side is that the S3 streams aren't closed properly, which leads to a connection leak until the S3 connection pool is exhausted.

What is the proper way to handle incomplete consumption of this sort?

Looking at the source code for the InputStreamSource class, I have noticed the following:

      override def postStop(): Unit = {
        if (!isClosed) {
          mat.tryFailure(new AbruptStageTerminationException(this))
        }
      }

      private def failStream(reason: Throwable): Unit = {
        closeInputStream()
        mat.tryFailure(new IOOperationIncompleteException(readBytesTotal, reason))
      }

Is there any particular reason not to close the stream in postStop while it’s done in failStream? At what time would it be called if we call runWith on a graph but the resulting input stream isn't fully consumed? Or should I be looking at something else?

UPDATE, in response to comments:

The graph is very simple. The original source is constructed out of a sequence of tuples (filename and Source constructed from an input stream), which flow through Archive.zip and into an input stream sink. The Clojure code is roughly as follows:

   (let [tuples (reify Iterable
                  (iterator [_]
                    ;; Some wrapper code that ultimately calls this for each item:
                    (Tuple2/apply (ArchiveMetadata/create filename)
                                  (StreamConverters/fromInputStream
                                    (reify Function0 (apply [_] attachment))))))
         source (Source/apply tuples)
         graph  (.via source (Archive/zip))
         sink   (StreamConverters/asInputStream
                  (FiniteDuration/apply ^long timeout TimeUnit/SECONDS))
         mat    (Materializer/matFromSystem system)]
     (.runWith graph sink mat))))

The Scala equivalent would be something like this:

val tuples = [some-iterable].map {
  case Something(filename, attachment) =>
    (ArchiveMetadata.create(filename),
     StreamConverters.fromInputStream(attachment)
}

val source = Source(tuples)
val graph = source.via(Archive.zip())
val sink = StreamConverters.asInputStream(timeout seconds)

// Asssuming an implicit materializer from an ActorSystem
graph.runWith(sink) // returns an InputStream

The input streams (attachment, above) typically are S3 streams but can also come from other repositories. They are not obtained by calling Akka. In case of S3, they are obtained by calling amazonica, which is a thin Clojure wrapper for Java AWS API.

UPDATE 2: To reproduce the issue, we have used curl requests with --head. If we use wget instead of curl, the zipped file is fully downloaded, and the issue isn't observed (all streams are closed). If we stop by requesting the head, the zipped stream (the one returned by .runWith) is closed but the original S3 streams (attachment in the code above) are not.

It is also reproducible if a breakpoint is placed before the execution of .runWith. So, there may be a race condition involved. No exception is thrown at the server side.

1

There are 1 answers

0
silverberry On

So, what is happening is that the Akka sink doesn't protect properly from the zipped input stream from being exhausted before the sources are fully zipped. If a read is called on the resulting InputStream and it has no data yet, the entire stage closes.

The input stream is an instance of akka.stream.impl.io.InputStreamAdapter (which implements java.io.InputStream), with the following code inside the implementation of read:

                sharedBuffer.poll(readTimeout.toMillis, TimeUnit.MILLISECONDS) match {
                  case Data(data) =>
                    detachedChunk = Some(data)
                    readBytes(a, begin, length)
                  case Finished =>
// THIS IS WHART HAPPENS!!!
                    isStageAlive = false
                    -1
                  case Failed(ex) =>
                    isStageAlive = false
                    throw new IOException(ex)
                  case null        => throw new IOException("Timeout on waiting for new data")
                  case Initialized => throw new IllegalStateException("message 'Initialized' must come first")
                }

I was under the impression that Akka Streams protect both from backpressure (when the producer is faster than the consumer) and from early exhaustion (when the consumer is faster than the producer). But apparently not!

Or maybe I'm using a wrong sink? Do I need a special sink for HTTP responses? Not Sink[_, InputStream] but, say, Sink[_, Future[HttpResponse]] or rather Sink[_, manifold.deferred.IDeferred] (because we use Aleph and not Akka Http? Although, I can convert from Future to IDeferred easily, if the appropriate sink is used.

Any suggestions?