Cancelling fs2 Streams When Encountering an Error in Akka Actor

70 views Asked by At

I'm having the following Akka Actor that does some self messages at a fixed time interval and for this purpose, I'm using fs2 streams that can send me signals at this fixed time interval:

private[this] val someTask =
  if (bindings.appConfig.isPersistentWSConn) {
    Some(StreamUtils.getATaskAsStream(bindings.appConfig.pingInterval)(x => self ! x))
  } else {
    logger.info(s"Not creating a Persistent WebSocket connection")
    None
  }
override def receive: Receive = {
  case jsValue: JsValue =>
    jsValue.validate[ValidateSomething].asEither match {
      case Right(ocppCall) => // doSomething match {
        case Failure(fail) => sink ! JsError(s"${fail.getMessage}")
        case Success(succ) => sink ! Json.toJson(succ)
      }
      case Left(errors) =>
        sink ! Json.toJson(s"error -> ${errors.head._2}") 
    }
  case x: Int =>
    logger.info(s"Elem: $x")
    Future.successful(heartbeatResponse(2,"HeartbeatRequest")).onComplete {
      case Failure(fail) => sink ! JsError(s"${fail.getMessage}")
      case Success(succ) => sink ! Json.toJson(succ)
    }
  case msg: Any =>
    logger.warn(s"Received unknown message ${msg.getClass.getTypeName} that cannot be handled, " +
      s"eagerly closing websocket connection")
    // HERE, I WANT TO CANCEL THE STREAM!!!!!!
    self ! PoisonPill
}

I have this function which gives me the trigger at fixed time intervals:

  def getATaskAsStream(pingInterval: FiniteDuration)(callback: Int => Unit): Stream[IO, Unit] = {
    val source = Stream.awakeEvery[IO](pingInterval).map(_ => 0)
    val sink = Stream.eval(IO(callback(0)))

    source.concurrently(sink).drain
  }

I would like to know how I can cancel the Stream before I send a PoisonPill to the Akka actor?

0

There are 0 answers