I am trying to read incremental data from my data source using Scala-Spark. Before hitting the source tables, I am trying to calculate the min & max of partition column that I use in my code in a Future which is present in a class: GetSourceMeta as given below.

def getBounds(keyIdMap:scala.collection.mutable.Map[String, String]): Future[scala.collection.mutable.Map[String, String]] = Future {
    var boundsMap = scala.collection.mutable.Map[String, String]()
    keyIdMap.keys.foreach(table => if(!keyIdMap(table).contains("Invalid")) {
        val minMax    = s"select max(insert_tms) maxTms, min(insert_tms) minTms from schema.${table} where source='DB2' and key_id in (${keyIdMap(table)})"
        println("MinMax: " + minMax)
        val boundsDF  = spark.read.format("jdbc").option("url", con.getConUrl()).option("dbtable", s"(${minMax}) as ctids").option("user", con.getUserName()).option("password", con.getPwd()).load()
        try {
            val maxTms = boundsDF.select("minTms").head.getTimestamp(0).toString + "," + boundsDF.select("maxTms").head.getTimestamp(0).toString
            println("Bounds: " + maxTms)
            boundsMap += (table -> maxTms)
        } catch {
            case np: java.lang.NullPointerException =>  { println("No data found") }
            case e: Exception => { println(s"Unknown exception: $e") }

I am calling the above method in my main method as:

object LoadToCopyDB {
    val conf = new SparkConf().setAppName("TEST_YEAR").set("some parameters")
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
        val gsm = new GetSourceMeta()
        val minMaxKeyMap = gsm.getBounds(keyIdMap).onComplete {
          case Success(values) => values.foreach(println)
          case Failure(f)      => f.printStackTrace

Well, the onComplete didn't print any values so I used andThen as below and that didn't help as well.

val bounds: Future[scala.collection.mutable.Map[String, String]] = gpMetaData.getBounds(incrementalIds) andThen {
  case Success(outval) => outval.foreach(println)
  case Failure(e)        => println(e)

Earlier the main thread exits without letting the Future: getBounds execute. Hence I couldn't find any println statements from the Future displayed on the terminal. I found out that I need to keep the main thread Await inorder to complete the Future. But when I use Await in main along with onComplete:

Await.result(bounds, Duration.Inf)

The compiler gives an error:

Type mismatch, expected: Awaitable[NotInferedT], actual:Unit

If I declare the val minMaxKeyMap as Future[scala.collection.mutable.Map[String, String] the compiler says: Expression of type Unit doesn't conform to expected type Future[mutable.map[String,String]]

I tried to print the values of bounds after the Await statement but that just prints an empty Map.

I couldn't understand how can to fix this. Could anyone let me know what do I do to make the Future run properly ?

1 Answers

mikealgo On Best Solutions

In this kind of cases, is always better to follow the types. The method onComplete only returns Unit, it won´t return a future hence it can´t be passed using Await.

In case you want to return a Future of any type you will have to map or flatmap the value and return an option, for example. In this case, does not matter what you return, you only want Await method to wait for this result and print a trace. You can treat the possible exception in the recover. It would be like that in your code:

val minMaxKeyMap:Future[Option[Any] = gsm.getBounds(keyIdMap).map { values =>
   case e: Throwable => 
          e. printStackTrace

Note that the recover part has to return an instance of the type. After that, you can apply the Await to the Future, and you will get the results printed. Is not the prettiest solution but it will work in your case.