I have created WS client on play framework2.6 which connects to REST API. This rest API is continuously sending chunks/stream of data. But then for few seconds REST API server goes down and is again restarted. I have written client to restart stream but it seems its not able to restart it.
def employesCompany() = Action.async { implicit request: Request[AnyContent] =>
val client = ws.underlying
val wsrequest: WSRequest = ws.url(playConfiguration.get[String]("employes.service.url"))
val futureResponse: Future[WSResponse] = wsrequest.stream()
futureResponse.flatMap { response =>
val source: Source[ByteString, Any] = response.bodyAsSource
val publisher = source.toMat(Sink.asPublisher(true))(Keep.right).run()
val flow = Flow[ByteString].map(res => res.utf8String)
val ns = Source.fromPublisher(publisher).via(flow)
val restartSource = RestartSource.withBackoff(Duration.apply(1, "sec"), Duration.apply(3, "sec"), 0.2) { () =>
ns.map {
elem =>
println(elem)
elem
}
}
Future.successful(Ok.chunked(restartSource via EventSource.flow).as(ContentTypes.EVENT_STREAM))
}
}
I am facing below error:
ERROR] [12/20/2020 18:35:27.585] [play-dev-mode-akka.actor.default-dispatcher-8] [RestartWithBackoffSource(akka://play-dev-mode)] **Restarting graph due to failure
java.io.IOException: An existing connection was forcibly closed by the remote host**
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at play.shaded.ahc.io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:368)
at play.shaded.ahc.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:891)
at play.shaded.ahc.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:277)
at play.shaded.ahc.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at play.shaded.ahc.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
at play.shaded.ahc.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
at play.shaded.ahc.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
at play.shaded.ahc.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
at play.shaded.ahc.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at play.shaded.ahc.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
The problem is that the request is already performed when you call
wsrequest.stream(), but you want it to be performed later when the Source is materialized. You therefore need to delay the creation of theFutureby not callingwsrequest.streamyourself but having Akka Stream do it.I haven't worked with Akka Stream in a while, but something like this should work: