I want to pipe a python machine learning file,predict the output and then attach it to my dataframe and then save it. The error that I am getting is :-

Exception Details

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
t org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:37)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:176)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:62)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:73)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:71)
    at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:71)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:82)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:85)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:103)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3198)
    at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3196)
    at StockPredictionKafkaStructuredStreaming$.loadingLinearRegressionModelPython(StockPredictionKafkaStructuredStreaming.scala:85)
    at StockPredictionKafkaStructuredStreaming$.predictingPrice(StockPredictionKafkaStructuredStreaming.scala:103)
    at StockPredictionKafkaStructuredStreaming$.delayedEndpoint$StockPredictionKafkaStructuredStreaming$1(StockPredictionKafkaStructuredStreaming.scala:15)
    at StockPredictionKafkaStructuredStreaming$delayedInit$body.apply(StockPredictionKafkaStructuredStreaming.scala:6)
    at scala.Function0.apply$mcV$sp(Function0.scala:39)
    at scala.Function0.apply$mcV$sp$(Function0.scala:39)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
    at scala.App.$anonfun$main$1$adapted(App.scala:80)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.App.main(App.scala:80)
    at scala.App.main$(App.scala:78)
    at StockPredictionKafkaStructuredStreaming$.main(StockPredictionKafkaStructuredStreaming.scala:6)
    at StockPredictionKafkaStructuredStreaming.main(StockPredictionKafkaStructuredStreaming.scala)

Scala Code

    import Utility.UtilityClass
    import org.apache.spark.ml.PipelineModel
    import org.apache.spark.sql.{DataFrame, Dataset, Row}
    import org.apache.spark.sql.functions.{col, from_json, lit}
    import org.apache.spark.sql.types.{DoubleType, StringType, StructType}
    object StockPredictionKafkaStructuredStreaming extends App {
      val brokers = args(0)
      val topics = args(1)
    
      val sparkSessionObj = UtilityClass.createSessionObject("StockPrediction")
      sparkSessionObj.sparkContext.setLogLevel("ERROR")
      import sparkSessionObj.implicits._
      val streamedDataFrame = takingInput()
      val preprocessedDataFrame = preProcessing(streamedDataFrame)
      val predictedPriceDataFrame = predictingPrice(preprocessedDataFrame)
      writeToOutputStream(predictedPriceDataFrame)
    
      def takingInput(): DataFrame = {
        val inputDataFrame = sparkSessionObj.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", brokers)
          .option("subscribe", topics)
          .load()
        inputDataFrame
      }
    
      private def creatingDataFrameFromJson(
          inputDataFrame: DataFrame
      ): DataFrame = {
        val schema = new StructType()
          .add("1. open", StringType, true)
          .add("2. high", StringType, true)
          .add("3. low", StringType, true)
          .add("4. close", StringType, true)
          .add("5. volume", StringType, true)
    
        val jsonStringDataFrame =
          inputDataFrame.selectExpr("CAST(value AS STRING)").as[String]
        val columnsRenamedDataFrame = jsonStringDataFrame
          .select(from_json(col("value"), schema).as("jsonData"))
          .select("jsonData.*")
          .withColumnRenamed("1. open", "Open")
          .withColumnRenamed("2. high", "High")
          .withColumnRenamed("3. low", "Low")
          .withColumnRenamed("4. close", "Close")
          .withColumnRenamed("5. volume", "Volume")
    
        columnsRenamedDataFrame
    
      }
      private def castingDataType(inputDataFrame: DataFrame): DataFrame = {
        val castedDataFrame = inputDataFrame.select(
          col("Open").cast(DoubleType),
          col("High").cast(DoubleType),
          col("Low").cast(DoubleType),
          col("Volume").cast(DoubleType)
        )
        castedDataFrame
      }
    
      def preProcessing(inputDataFrame: DataFrame): DataFrame = {
        val columnsRenamedDataFrame = creatingDataFrameFromJson(inputDataFrame)
        val castedDataFrame = castingDataType(columnsRenamedDataFrame)
        castedDataFrame
      }
    
      private def loadingLinearRegressionModelSpark(
          inputDataFrame: DataFrame
      ): DataFrame = {
        val linearRegressionModel =
          PipelineModel.load("./MachineLearningModel/model")
        //Applying the model to our Input DataFrame
        val predictedDataFrame = linearRegressionModel.transform(inputDataFrame)
        //Extracting the Predicted Close Price from the Output DataFrame
        predictedDataFrame
    
      }
    
      private def loadingLinearRegressionModelPython(
          inputDataFrame: DataFrame
      ): DataFrame = {
        val command = "python3 ./pythonFiles/StockPricePrediction.py"
        // creating rdd with the input files,repartitioning the rdd and passing the command using pipe
        val predictedPriceRDD =
          inputDataFrame.rdd
            .repartition(1)
            .pipe(command)
    //    //Collecting the result from the output RDD.
        val predictedPrice = predictedPriceRDD.collect().apply(0)
        val scaledPredictedPrice = BigDecimal(predictedPrice)
          .setScale(2, BigDecimal.RoundingMode.HALF_UP)
          .toDouble
        val predictedColumnDataFrame =
          inputDataFrame.withColumn("prediction", lit(scaledPredictedPrice))
        println(predictedColumnDataFrame.isStreaming)
        predictedColumnDataFrame.printSchema()
        predictedColumnDataFrame
      }
    
      def predictingPrice(
          inputDataFrame: DataFrame
      ): DataFrame = {
        val predictedDataFrame = loadingLinearRegressionModelPython(inputDataFrame)
        val predictedClosePrice = predictedDataFrame.select(
          col("prediction").alias("Predicted Close Price"),
          col("Open"),
          col("Low"),
          col("High"),
          col("Volume")
        )
        predictedClosePrice
      }
    
      def writeToOutputStream(inputDataFrame: DataFrame): Unit = {
        val outputQuery = inputDataFrame.writeStream
          .format("console")
          .outputMode("append")
          .start()
          .awaitTermination()
      }
    }

1

There are 1 answers

3
s.polam On

Try below code.

  val streamedDataFrame = takingInput()
  val preprocessedDataFrame = preProcessing(streamedDataFrame)
  // Dont invoke below line here, instead invoke below like inside foreachBatch format.
  //val predictedPriceDataFrame = predictingPrice(preprocessedDataFrame)
  writeToOutputStream(preprocessedDataFrame)

 def writeToOutputStream(inputDataFrame: DataFrame): Unit = {
    val outputQuery = inputDataFrame
    .writeStream
    .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        predictingPrice(batchDF).show(false) // Added above line here.
      }.start()
      .awaitTermination()
  }