Handling disconnects and reconnects in Spray and custom actor with Chunked Response

883 views Asked by At

I'm new to Spray and trying to implement a client that receives a stream of data from a server. My current code looks like the following. The client send a request to the HTTP server which then responds back with a stream of data (as chunked responses).I checked that it connects to the server and can get responses back.

However, it's not clear to me how I should be handling disconnects and reconnects. For example, (A) if I loose network connectivity or (B) my client timeouts because the server may not have any data to send at that moment. Any pointers/examples would be appreciated.

UPDATE

First, I want to detect events (A) and (B) above. When the client experiences either (A) or (B) above it should reestablish connection and re-authenticate so that it can continue (get back to connected state to get the data stream.

import spray.http._
import spray.client.pipelining._
import akka.actor._
import spray.can.Http
import akka.io.IO
import spray.http.HttpRequest
import spray.http.ChunkedResponseStart
import spray.http.HttpRequest
import spray.http.ChunkedResponseStart

trait Authorization {
  def authorize: HttpRequest => HttpRequest
}

trait OAuthAuthorization extends Authorization {

  import OAuth._

  val consumer = ???
  val token = ???
  val authorize: (HttpRequest) => HttpRequest = OAuthorizer(token, token)
}


class StreamerActor(uri: Uri) extends Actor with ActorLogging {
  this: Authorization =>
  val io = IO(Http)(context.system)

  //Initial state of the Actor
  def receive = ready

  def ready: Receive = {
    case query: String =>
      val body = HttpEntity(ContentType(MediaTypes.`application/x-www-form-urlencoded`), s"$query")
      val req = HttpRequest(HttpMethods.POST, uri = uri, entity = body) ~> authorize
      sendTo(io).withResponsesReceivedBy(self)(req)
      //As soon as you get the data you should change state to "connected" by using a "become"
      context become connected
  }

  def connected: Receive = {
    case ChunkedResponseStart(_) => log.info("Chunked Response started.")
    case MessageChunk(entity, _) => log.info(entity.asString)
    case ChunkedMessageEnd(_, _) => log.info("Chunked Message Ended")
    case Http.Closed => log.info("HTTP closed")
    case _ =>
  }
}

object SprayStreamer extends App {

  val system = ActorSystem("simple-spray-http")
  val Uri = Uri("https://.....")
  val streamClient = system.actorOf(Props(new StreamerActor(Uri) with OAuthAuthorization), name = "spray-client")
  streamClient ! "keyword"

}

These are the contents of my resources/application.conf

spray {
  can.server {
    idle-timeout = 90 s
    request-timeout = 80 s
    connection-timeout = 90 s
    reqiest-chunk-aggregation-limit = 0
  }

  can.client {
    idle-timeout = 90 s
    request-timeout = 80 s
    connection-timeout = 90 s
    response-chunk-aggregation-limit = 0
  }

  io.confirm-sends = on

}
0

There are 0 answers