mock input dstream apache spark

1.1k views Asked by At

I am trying to mock the input dstream while writing a spark stream unit test. I am able to mock the RDD but when I am trying to convert them into dstream, dstream object is coming up empty. I have used the following code-

val lines = mutable.Queue[RDD[String]]()
val dstream = streamingContext.queueStream(lines)

// append data to DStream
lines += sparkContext.makeRDD(Seq("To be or not to be.", "That is the question."))

Any help regarding the same will be highly appreciated.

1

There are 1 answers

0
saurabh shashank On

Write UT for all DataFrameWriter, DataFrameReader, DataStreamReader, DataStreamWriter

The sample test case using the above steps

  1. Mock
  2. Behavior
  3. Assertion

Maven based dependencies

<groupId>org.scalatestplus</groupId>
<artifactId>mockito-3-4_2.11</artifactId>
<version>3.2.3.0</version>
<scope>test</scope>


<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>2.13.0</version>
<scope>test</scope>

Let’s use an example of a spark class where the source is Hive and the sink is JDBC

class DummySource extends SparkPipeline {
  /**
   * Method to read the source and create a Dataframe
   *
   * @param sparkSession : SparkSession
   * @return : DataFrame
   */
  override def read(spark: SparkSession): DataFrame = {
    spark.read.table("Table_Name").filter("_2 > 1")
  }

  /**
   * Method to transform the dataframe
   *
   * @param df : DataFrame
   * @return : DataFrame
   */
  override def transform(df: DataFrame): DataFrame = ???

  /**
   * Method to write/save the Dataframe to a target
   *
   * @param df : DataFrame
   *
   */
  override def write(df: DataFrame): Unit =
    df.write.jdbc("url", "targetTableName", new Properties())
}

Mocking Read

test("Spark read table") {
  val dummySource = new DummySource()
  val sparkSession = SparkSession
    .builder()
    .master("local[*]")
    .appName("mocking spark test")
    .getOrCreate()
  val testData = Seq(("one", 1), ("two", 2))
  val df = sparkSession.createDataFrame(testData)
  df.show()
  val mockDataFrameReader = mock[DataFrameReader]
  val mockSpark = mock[SparkSession]
  when(mockSpark.read).thenReturn(mockDataFrameReader)
  when(mockDataFrameReader.table("Table_Name")).thenReturn(df)
  dummySource.read(mockSpark).count() should be(1)
}

Mocking Write

  test("Spark write") {
  val dummySource = new DummySource()
  val mockDf = mock[DataFrame]
  val mockDataFrameWriter = mock[DataFrameWriter[Row]]
  when(mockDf.write).thenReturn(mockDataFrameWriter)
  when(mockDataFrameWriter.mode(SaveMode.Append)).thenReturn(mockDataFrameWriter)
  doNothing().when(mockDataFrameWriter).jdbc("url", "targetTableName", new Properties())
  dummySource.write(df = mockDf)
}

enter image description here

Streaming code in ref

Ref : https://medium.com/walmartglobaltech/spark-mocking-read-readstream-write-and-writestream-b6fe70761242