import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.{ broadcast => infabroadcast }
import java.io._
import java.sql.Timestamp
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger._


object StructStreaming {

  def main(s:Array[String]) {
    val sqlContext = SparkSession.builder().enableHiveSupport()
                  .master("local[*]")
                   .getOrCreate()
    import sqlContext.implicits._
    import org.apache.spark.sql.functions.{stddev_samp, var_samp}

    val v1 = sqlContext.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "source").load().toDF();

    val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true),  StructField("age", IntegerType, true), StructField("timestamp", TimestampType, true)))

    val stream = v1.selectExpr("cast (value as string) as json")
                .select(from_json($"json", schema=schema) as "data")
                .select("data.*")

    val v5 = sqlContext.sql("SELECT hive_lookup.col0 as id, hive_lookup.col1 as name, hive_lookup.col2 as age, hive_lookup.col3 as timestamp FROM default.hive_lookup").cache().toDF;

    val static = v5.groupBy(col("id")).agg(col("id"), last(col("name"), false), last(col("age"), false), last(col("timestamp"), false)).toDF()

    val result = stream.join(static, stream.col("id").equalTo(static.col("id")), "left_outer").toDF()


    result.writeStream.format("console").start()

  val activeStreams = sqlContext.streams.active
  activeStreams.foreach( stream => stream.awaitTermination())
}
}

I am getting below error if I use agg() before join.

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:343)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2.0 failed 1 times, most recent failure: Lost task 2.0 in stage 2.0 (TID 8, localhost, executor driver): java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:209)
    at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:177)
    at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
    at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
    at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:331)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:295)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:331)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:295)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:331)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:295)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:331)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:295)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:331)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:295)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:944)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:370)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:943)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2881)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2391)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2391)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2862)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2861)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2391)
    at org.apache.spark.sql.execution.streaming.ConsoleSink.addBatch(console.scala:49)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:666)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:666)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:666)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:665)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:306)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
    ... 1 more
Caused by: java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:209)
    at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(statefulOperators.scala:177)
    at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
    at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
    at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:331)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:295)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:331)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:295)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:331)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:295)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:331)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:295)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:331)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:295)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

But when I removed the statement

 v5.groupBy(col("id")).agg(col("id"), last(col("name"), false),
 last(col("age"), false), last(col("timestamp"), false)).toDF()

and use v5 (as static data frame) for joining it works fine.

As per Structured Streaming doc

As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used. - Cannot use streaming aggregations before joins.

But here in the code I am using aggregation on static data frame not on streaming data frame. I am running this code against spark 2.2.1. Can anyone please help me here? Am I doing wrong somewhere in the code?

1

There are 1 answers

0
VikAsh KuMar On BEST ANSWER

A quick update : With spark 2.3.1 jars the standalone program is working fine. No issue observed.