I used SQLContext in a spark streaming application as blew:
case class topic_name (f1: Int, f2: Int)
val sqlContext = new SQLContext(sc)
@transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
ssc.checkpoint(".")
val theDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name"))
theDStream.map(x => x._2).foreach { rdd =>
sqlContext.jsonRDD(newsIdRDD).registerTempTable("topic_name")
sqlContext.sql("select count(*) from topic_name").foreach { x =>
WriteToFile("file_path", x(0).toString)
}
}
ssc.start()
ssc.awaitTermination()
I found i could only get every 5 seconds's count of message, because "The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame", i guess every 5 seconds, a new sqlContext will be create and the temporary table can only alive just 5 seconds, i want to the sqlContext and the temporary table alive all the streaming application's life cycle, how to do it?
Thanks~
You are right. A SQLContext only remembers the tables registered for the lifetime of that object. So, instead of using registerTempTable, you should proabably use a persistent storage like Hive using saveAsTable command.