I'm having the following Akka Actor that does some self messages at a fixed time interval and for this purpose, I'm using fs2 streams that can send me signals at this fixed time interval:
private[this] val someTask =
if (bindings.appConfig.isPersistentWSConn) {
Some(StreamUtils.getATaskAsStream(bindings.appConfig.pingInterval)(x => self ! x))
} else {
logger.info(s"Not creating a Persistent WebSocket connection")
None
}
override def receive: Receive = {
case jsValue: JsValue =>
jsValue.validate[ValidateSomething].asEither match {
case Right(ocppCall) => // doSomething match {
case Failure(fail) => sink ! JsError(s"${fail.getMessage}")
case Success(succ) => sink ! Json.toJson(succ)
}
case Left(errors) =>
sink ! Json.toJson(s"error -> ${errors.head._2}")
}
case x: Int =>
logger.info(s"Elem: $x")
Future.successful(heartbeatResponse(2,"HeartbeatRequest")).onComplete {
case Failure(fail) => sink ! JsError(s"${fail.getMessage}")
case Success(succ) => sink ! Json.toJson(succ)
}
case msg: Any =>
logger.warn(s"Received unknown message ${msg.getClass.getTypeName} that cannot be handled, " +
s"eagerly closing websocket connection")
// HERE, I WANT TO CANCEL THE STREAM!!!!!!
self ! PoisonPill
}
I have this function which gives me the trigger at fixed time intervals:
def getATaskAsStream(pingInterval: FiniteDuration)(callback: Int => Unit): Stream[IO, Unit] = {
val source = Stream.awakeEvery[IO](pingInterval).map(_ => 0)
val sink = Stream.eval(IO(callback(0)))
source.concurrently(sink).drain
}
I would like to know how I can cancel the Stream before I send a PoisonPill to the Akka actor?