Using spray with "chunked responses"

486 views Asked by At

I am using Spray to query a REST endpoint which will return a largish amount of data with several items that should be processed. The data is a series of json objects. Is there a way to convert the response into a stream of these objects that doesn not require me to read the entire response into memory?

Reading the docs there is mention of "chunked responses", which seem to be along the lines of what I want. How do I use that in a spray-client pipeline?

1

There are 1 answers

0
Duncan Watson On

I've just implemented something like this today, thanks to the excellent article found at http://boldradius.com/blog-post/VGy_4CcAACcAxg-S/streaming-play-enumerators-through-spray-using-chunked-responses.

Essentially, what you want to do is to get hold of the RequestContext in one of your Route definitions, and get a reference to its "responder" Actor. This is the Actor by which Spray sends responses back to the client that sent the original request.

To send back a chunked response, you have to signal that the response is starting, then send the chunks one by one, and then finally signal that the response has finished. You do this via the ChunkedResponseStart, MessageChunk, and ChunkedMessageEnd classes from spray.http package.

Essentially what I end up doing is sending a response as a series of these classes like this:

0) A bunch of imports to put into the class with your Routes in, and a case object:

import akka.actor.{Actor, ActorRef}
import spray.http._
import akka.actor.ActorRef
import akka.util.Timeout
import akka.pattern.ask
import spray.http.HttpData
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import akka.actor.{ActorContext, ActorRefFactory, Props}
import spray.http.{HttpData, ContentType}
import spray.routing.RequestContext
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import spray.json.RootJsonFormat
import spray.http.MediaTypes._

object Messages {
    case object Ack
}

1) Get a hold of the requestContext from your Route:

path ("asdf") {
  get { requestContext => {
    ... further code here for sending chunked response ...
  }
}

2) Start the response (as a JSON envelope that'll hold the response data in a JSON array called "myJsonData" in this case):

responder.forward(ChunkedResponseStart(HttpResponse(entity = HttpEntity(`application/json`, """{"myJsonData": ["""))).withAck(Ack))

3) Iterate over your array of results, sending their JSONified versions to the response as elements in the JSON array, comma separated until the final element is sent - then no need for a trailing comma:

requestContext.responder.forward(MessageChunk(HttpData(myArray.toJson).withAck(Ack))

if (!lastElement) { // however you work this out in your code!
requestContext.responder.forward(MessageChunk(HttpData(",").withAck(Ack))
}

4) When there's nothing left to send, close the JSON envelope:

responder.forward(MessageChunk("]}").withAck(Ack))

and signal the end of the response:

responder.forward(ChunkedMessageEnd().withAck(Ack))

In my solution I have been working with Play Iteratees and Enumerators and so I have not included big chunks of code here because they are very much tied up with these mechanisms which may not be suitable for your needs. The point of the "withAck" call is that this will cause the responder to ask for an Acknowledgement message when the network signals that it's OK to accept more chunks. Ideally you would craft your code to wait for the return of the Ack message in the future before sending more chunks.

I hope that the above may give you a starter for ten at least, and as I say, these concepts are explained really well in the article I linked to!

Thanks, Duncan