Outgoing connection stream closed

1.7k views Asked by At

I have an actor with the behaviour:

def receive: Receive = {
    case Info(message) =>
      val res = send("INFO:"  + message)
      installAckHook(res)
    case Warning(message) =>
      val res = send("WARNING:"  + message)
      installAckHook(res)
    case Error(message) =>
      val res = send("ERROR:" + message)
      installAckHook(res)
  }

 private def installAckHook[T](fut: Future[T]): Unit = {
    val answerTo = sender()

    fut.onComplete {
      case Success(_) => answerTo ! "OK"
      case Failure(ex) => answerTo ! ex
    }
  }


  private def send(message: String): Future[HttpResponse] = {
    import context.system
    val payload: Payload = Payload(text = message,
      username = slackConfig.username, icon_url = slackConfig.iconUrl,
      icon_emoji = slackConfig.iconEmoji, channel = slackConfig.channel)
      .validate
    Http().singleRequest(RequestBuilding.Post(slackConfig.hookAddress, payload))
  }

And a test

val actorRef = system.actorOf(SlackHookActor.props(SlackEndpointConfig(WebHookUrl,iconEmoji = Some(":ghost:"))))
actorRef ! Error("Some error message")
actorRef ! Warning("Some warning message")
actorRef ! Info("Some info message")
receiveN(3)

and in the afterAll() method I do a shutdown on the actor system using TestKit.

It works, the request makes it to the server, but there are errors from the akka streams part:

[ERROR] [06/26/2015 11:34:55.118] [SlackHookTestingSystem-akka.actor.default-dispatcher-10] [ActorSystem(SlackHookTestingSystem)] Outgoing request stream error (akka.stream.AbruptTerminationException)
[ERROR] [06/26/2015 11:34:55.120] [SlackHookTestingSystem-akka.actor.default-dispatcher-13] [ActorSystem(SlackHookTestingSystem)] Outgoing request stream error (akka.stream.AbruptTerminationException)
[ERROR] [06/26/2015 11:34:55.121] [SlackHookTestingSystem-akka.actor.default-dispatcher-8] [ActorSystem(SlackHookTestingSystem)] Outgoing request stream error (akka.stream.AbruptTerminationException)

Seems like since I have a Future completed the outgoing connection should be already closed, so is this a bug or am I missing sth?

1

There are 1 answers

1
Somatik On

You need to also shut down the http connection pools, something like

Http().shutdownAllConnectionPools().onComplete{ _ =>
  system.shutdown()
}

Maybe the akka http testkit provides some helpers