Hi Everyone I need your help. Please guide on this exception.
java.util.concurrent.TimeoutException: Request timeout to ******* after 60000 ms.
Because of this exception one of the operator in flink stream becomes very slow. And cause huge backlog in kafka.
what it is the exact reason that when these timeout occurs async client comes under pressure? Does it have something to related to async backend?
We have mechanism of retry if any request fails if i reduce this timeout from 60 sec to some less val lets say 3 sec. Record will soon go to retry mechnanism will not be holding one the backend ?
I don't know what will be the downsides of it (may be huge amounts of records will going to retry) or how it will going to help in this case.
Is there any better approach for handling this?
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.time.Instant
import java.util.{Calendar, UUID}
import io.circe._
import scala.util.Try
import org.apache.flink.util.concurrent.Executors
import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
import org.slf4j.{Logger, LoggerFactory}
import sttp.client._
import sttp.client.asynchttpclient.future.AsyncHttpClientFutureBackend
import sttp.model.Uri
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
class MySink(prop: Prop)
extends AsyncFunction[Record, CallDetails] {
@transient lazy val directExecutor: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())
implicit lazy val backend = AsyncHttpClientFutureBackend()
val uri = Uri.apply(prop.Uri())
override def asyncInvoke(
record: BatchRecord,
resultFuture: ResultFuture[CallDetails]
): Unit = {
try {
val batchUUID = com.datastax.driver.core.utils.UUIDs.timeBased()
val timeStamp = getTimestampAndDate
val token = record.sessionDetails
val future = sendData(
token.token.tokenString,
record
)
future.onComplete {
case Success(response) => {
resultFuture.complete(Iterable(recordCall(response, record, batchUUID, time)))
}
case Failure(exception) =>
resultFuture.complete(Iterable(recordCall(exception, record, batchUUID, time)))
}(directExecutor)
} catch {
case e: Exception =>
resultFuture.completeExceptionally(e)
}
}
def sendData(
token: String,
record: Json,
testUri: Option[Uri] = None
): Future[Response[Either[String, String]]] = {
val jsonFormatted = record.toString()
val dataRequest = basicRequest
.contentType("application/json")
.auth.bearer(token)
.body(
s"""$jsonFormatted"""
)
.post((testUri.getOrElse(uri)))
dataRequest.send()
}