Why don't Akka Streams application terminate normally?

1.4k views Asked by At

I wrote this simple application using Alpakka Cassandra Library

package com.abhi

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink}
import com.datastax.driver.core.{Cluster, Row, SimpleStatement}
import scala.concurrent.Await
import scala.concurrent.duration._

object MyApp extends App {
   implicit val actorSystem = ActorSystem()
   implicit val actorMaterializer = ActorMaterializer()
   implicit val session = Cluster
      .builder
      .addContactPoints(List("localhost") :_*)
      .withPort(9042)
      .withCredentials("foo", "bar")
      .build
      .connect("foobar")
   val stmt = new SimpleStatement("SELECT col1, col2 FROM foo").setFetchSize(20)
   val source = CassandraSource(stmt)
   val toFoo = Flow[Row].map(row => Foo(row.getLong(0), row.Long(1)))
   val sink = Sink.foreach[Foo](foo => println(foo.col1, foo.col2))
   val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit b =>
      s =>
      import GraphDSL.Implicits._
      source.take(10) ~> toFoo ~> s
      ClosedShape
   })
   // let us run the graph
   val future = graph.run()
   import actorSystem.dispatcher
   future.onComplete{_ =>
      session.close()
      Await.result(actorSystem.terminate(), Duration.Inf)
   }
   Await.result(future, Duration.Inf)
   System.exit(0)
}

case class Foo(col1: Long, col2: Long)

This application runs exactly as expected it prints 10 rows on the screen.

But post that it hangs. When the System.exit(0) call is executed it throws an exception

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0"

But still the application does not stop running. it just hangs.

I don't understand why doesn't this application terminate normally (in fact it shouldn't even need system.exit(0) call.

The only way to exit this application is via a control C.

1

There are 1 answers

1
Frederic A. On BEST ANSWER

This might happen because sbt runs your code in its own JVM instance, your System.exit will then exit sbt's JVM giving the above result.

Did you try setting: fork in run := true somewhere in your sbt build?

I'm also not sure it is a good idea to use actorSystem.dispatcher to execute your onComplete callback (because you use it to wait for the termination of the actor system itself).

Something you could try instead:

import actorSystem.dispatcher
future.onComplete{ _ =>
  session.close()
  actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)

Note that the JVM will exit without you needing to call System.exit when the only threads left are daemon threads (see for example What is Daemon thread in Java?).