I'm new to Spray and trying to implement a client that receives a stream of data from a server. My current code looks like the following. The client send a request to the HTTP server which then responds back with a stream of data (as chunked responses).I checked that it connects to the server and can get responses back.
However, it's not clear to me how I should be handling disconnects and reconnects. For example, (A) if I loose network connectivity or (B) my client timeouts because the server may not have any data to send at that moment. Any pointers/examples would be appreciated.
UPDATE
First, I want to detect events (A) and (B) above.
When the client experiences either (A) or (B) above it should reestablish connection and re-authenticate so that it can continue (get back to connected
state to get the data stream.
import spray.http._
import spray.client.pipelining._
import akka.actor._
import spray.can.Http
import akka.io.IO
import spray.http.HttpRequest
import spray.http.ChunkedResponseStart
import spray.http.HttpRequest
import spray.http.ChunkedResponseStart
trait Authorization {
def authorize: HttpRequest => HttpRequest
}
trait OAuthAuthorization extends Authorization {
import OAuth._
val consumer = ???
val token = ???
val authorize: (HttpRequest) => HttpRequest = OAuthorizer(token, token)
}
class StreamerActor(uri: Uri) extends Actor with ActorLogging {
this: Authorization =>
val io = IO(Http)(context.system)
//Initial state of the Actor
def receive = ready
def ready: Receive = {
case query: String =>
val body = HttpEntity(ContentType(MediaTypes.`application/x-www-form-urlencoded`), s"$query")
val req = HttpRequest(HttpMethods.POST, uri = uri, entity = body) ~> authorize
sendTo(io).withResponsesReceivedBy(self)(req)
//As soon as you get the data you should change state to "connected" by using a "become"
context become connected
}
def connected: Receive = {
case ChunkedResponseStart(_) => log.info("Chunked Response started.")
case MessageChunk(entity, _) => log.info(entity.asString)
case ChunkedMessageEnd(_, _) => log.info("Chunked Message Ended")
case Http.Closed => log.info("HTTP closed")
case _ =>
}
}
object SprayStreamer extends App {
val system = ActorSystem("simple-spray-http")
val Uri = Uri("https://.....")
val streamClient = system.actorOf(Props(new StreamerActor(Uri) with OAuthAuthorization), name = "spray-client")
streamClient ! "keyword"
}
These are the contents of my resources/application.conf
spray {
can.server {
idle-timeout = 90 s
request-timeout = 80 s
connection-timeout = 90 s
reqiest-chunk-aggregation-limit = 0
}
can.client {
idle-timeout = 90 s
request-timeout = 80 s
connection-timeout = 90 s
response-chunk-aggregation-limit = 0
}
io.confirm-sends = on
}