I want to pipe a python machine learning file,predict the output and then attach it to my dataframe and then save it. The error that I am getting is :-
Exception Details
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
t org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:37)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:73)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:71)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:71)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:82)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:103)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3198)
at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3196)
at StockPredictionKafkaStructuredStreaming$.loadingLinearRegressionModelPython(StockPredictionKafkaStructuredStreaming.scala:85)
at StockPredictionKafkaStructuredStreaming$.predictingPrice(StockPredictionKafkaStructuredStreaming.scala:103)
at StockPredictionKafkaStructuredStreaming$.delayedEndpoint$StockPredictionKafkaStructuredStreaming$1(StockPredictionKafkaStructuredStreaming.scala:15)
at StockPredictionKafkaStructuredStreaming$delayedInit$body.apply(StockPredictionKafkaStructuredStreaming.scala:6)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at scala.Function0.apply$mcV$sp$(Function0.scala:39)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
at scala.App.$anonfun$main$1$adapted(App.scala:80)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.App.main(App.scala:80)
at scala.App.main$(App.scala:78)
at StockPredictionKafkaStructuredStreaming$.main(StockPredictionKafkaStructuredStreaming.scala:6)
at StockPredictionKafkaStructuredStreaming.main(StockPredictionKafkaStructuredStreaming.scala)
Scala Code
import Utility.UtilityClass
import org.apache.spark.ml.PipelineModel
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions.{col, from_json, lit}
import org.apache.spark.sql.types.{DoubleType, StringType, StructType}
object StockPredictionKafkaStructuredStreaming extends App {
val brokers = args(0)
val topics = args(1)
val sparkSessionObj = UtilityClass.createSessionObject("StockPrediction")
sparkSessionObj.sparkContext.setLogLevel("ERROR")
import sparkSessionObj.implicits._
val streamedDataFrame = takingInput()
val preprocessedDataFrame = preProcessing(streamedDataFrame)
val predictedPriceDataFrame = predictingPrice(preprocessedDataFrame)
writeToOutputStream(predictedPriceDataFrame)
def takingInput(): DataFrame = {
val inputDataFrame = sparkSessionObj.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.load()
inputDataFrame
}
private def creatingDataFrameFromJson(
inputDataFrame: DataFrame
): DataFrame = {
val schema = new StructType()
.add("1. open", StringType, true)
.add("2. high", StringType, true)
.add("3. low", StringType, true)
.add("4. close", StringType, true)
.add("5. volume", StringType, true)
val jsonStringDataFrame =
inputDataFrame.selectExpr("CAST(value AS STRING)").as[String]
val columnsRenamedDataFrame = jsonStringDataFrame
.select(from_json(col("value"), schema).as("jsonData"))
.select("jsonData.*")
.withColumnRenamed("1. open", "Open")
.withColumnRenamed("2. high", "High")
.withColumnRenamed("3. low", "Low")
.withColumnRenamed("4. close", "Close")
.withColumnRenamed("5. volume", "Volume")
columnsRenamedDataFrame
}
private def castingDataType(inputDataFrame: DataFrame): DataFrame = {
val castedDataFrame = inputDataFrame.select(
col("Open").cast(DoubleType),
col("High").cast(DoubleType),
col("Low").cast(DoubleType),
col("Volume").cast(DoubleType)
)
castedDataFrame
}
def preProcessing(inputDataFrame: DataFrame): DataFrame = {
val columnsRenamedDataFrame = creatingDataFrameFromJson(inputDataFrame)
val castedDataFrame = castingDataType(columnsRenamedDataFrame)
castedDataFrame
}
private def loadingLinearRegressionModelSpark(
inputDataFrame: DataFrame
): DataFrame = {
val linearRegressionModel =
PipelineModel.load("./MachineLearningModel/model")
//Applying the model to our Input DataFrame
val predictedDataFrame = linearRegressionModel.transform(inputDataFrame)
//Extracting the Predicted Close Price from the Output DataFrame
predictedDataFrame
}
private def loadingLinearRegressionModelPython(
inputDataFrame: DataFrame
): DataFrame = {
val command = "python3 ./pythonFiles/StockPricePrediction.py"
// creating rdd with the input files,repartitioning the rdd and passing the command using pipe
val predictedPriceRDD =
inputDataFrame.rdd
.repartition(1)
.pipe(command)
// //Collecting the result from the output RDD.
val predictedPrice = predictedPriceRDD.collect().apply(0)
val scaledPredictedPrice = BigDecimal(predictedPrice)
.setScale(2, BigDecimal.RoundingMode.HALF_UP)
.toDouble
val predictedColumnDataFrame =
inputDataFrame.withColumn("prediction", lit(scaledPredictedPrice))
println(predictedColumnDataFrame.isStreaming)
predictedColumnDataFrame.printSchema()
predictedColumnDataFrame
}
def predictingPrice(
inputDataFrame: DataFrame
): DataFrame = {
val predictedDataFrame = loadingLinearRegressionModelPython(inputDataFrame)
val predictedClosePrice = predictedDataFrame.select(
col("prediction").alias("Predicted Close Price"),
col("Open"),
col("Low"),
col("High"),
col("Volume")
)
predictedClosePrice
}
def writeToOutputStream(inputDataFrame: DataFrame): Unit = {
val outputQuery = inputDataFrame.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
}
}
Try below code.