mock input dstream apache spark

1.1k views Asked by At

I am trying to mock the input dstream while writing a spark stream unit test. I am able to mock the RDD but when I am trying to convert them into dstream, dstream object is coming up empty. I have used the following code-

val lines = mutable.Queue[RDD[String]]()
val dstream = streamingContext.queueStream(lines)

// append data to DStream
lines += sparkContext.makeRDD(Seq("To be or not to be.", "That is the question."))

Any help regarding the same will be highly appreciated.


There are 1 answers

saurabh shashank On

Write UT for all DataFrameWriter, DataFrameReader, DataStreamReader, DataStreamWriter

The sample test case using the above steps

  1. Mock
  2. Behavior
  3. Assertion

Maven based dependencies



Let’s use an example of a spark class where the source is Hive and the sink is JDBC

class DummySource extends SparkPipeline {
   * Method to read the source and create a Dataframe
   * @param sparkSession : SparkSession
   * @return : DataFrame
  override def read(spark: SparkSession): DataFrame = {"Table_Name").filter("_2 > 1")

   * Method to transform the dataframe
   * @param df : DataFrame
   * @return : DataFrame
  override def transform(df: DataFrame): DataFrame = ???

   * Method to write/save the Dataframe to a target
   * @param df : DataFrame
  override def write(df: DataFrame): Unit =
    df.write.jdbc("url", "targetTableName", new Properties())

Mocking Read

test("Spark read table") {
  val dummySource = new DummySource()
  val sparkSession = SparkSession
    .appName("mocking spark test")
  val testData = Seq(("one", 1), ("two", 2))
  val df = sparkSession.createDataFrame(testData)
  val mockDataFrameReader = mock[DataFrameReader]
  val mockSpark = mock[SparkSession]
  when(mockDataFrameReader.table("Table_Name")).thenReturn(df) should be(1)

Mocking Write

  test("Spark write") {
  val dummySource = new DummySource()
  val mockDf = mock[DataFrame]
  val mockDataFrameWriter = mock[DataFrameWriter[Row]]
  doNothing().when(mockDataFrameWriter).jdbc("url", "targetTableName", new Properties())
  dummySource.write(df = mockDf)

enter image description here

Streaming code in ref

Ref :