Using spray 1.3.2 with akka 2.3.6. (akka is used only for spray).
I need to read huge files and for each line make a http request.
I read the files line by line with iterator, and for each item make the request.
It run successfully for some of the lines but at some time it start to fail with:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://default/user/IO-HTTP#-35162984]] after [60000 ms]
.
I first thought I overloading the service, so I set the "spray.can.host-connector.max-connections" to 1. It run much slower but I got the same errors.
Here the code:
import spray.http.MediaTypes._
val EdnType = register(
MediaType.custom(
mainType = "application",
subType = "edn",
compressible = true,
binary = false,
fileExtensions = Seq("edn")))
val pipeline = (
addHeader("Accept", "application/json")
~> sendReceive
~> unmarshal[PipelineResponse])
def postData(data: String) = {
val request = Post(pipelineUrl).withEntity(HttpEntity.apply(EdnType, data))
val responseFuture: Future[PipelineResponse] = pipeline(request)
responseFuture
}
dataLines.map { d =>
val f = postData(d)
f.onFailure { case e => println("Error - "+e)} // This is where the errors are display
f.map { p => someMoreLogic(d, p) }
}
aggrigateResults(dataLines)
I do it in such way since I don't need the entire data, just some aggregations.
How can I solve this and keep it entirely async?
Akka ask timeout is implemented via firstCompletedOf, so the timer starts when the ask is initialized.
What you seem to be doing, is spawning a Future for each line (during the map) - so all your calls execute nearly at the same time. The timeouts start counting when the futures are initialized, but there are no executor threads left for all the spawned actors to do their work. Hence the asks time out.
Instead of processing "all at once", I would suggest a more flexible approach - somewhat similar to using iteratees, or akka-streams: Work Pulling Pattern. (Github)
You provide the iterator that you already have as an
Epic
. Introduce aWorker
actor, which will perform the call & some logic. If you spawnN workers
then, there will be at mostN
lines being processed concurrently (and the processing pipeline may involve multiple steps). This way you can ensure that you are not overloading the executors, and the timeouts shouldn't happen.