Slow performance in spark streaming

873 views Asked by At

I am using spark streaming 1.1.0 locally (not in a cluster). I created a simple app that parses the data (about 10.000 entries), stores it in a stream and then makes some transformations on it. Here is the code:

def main(args : Array[String]){

    val master = "local[8]"
    val conf = new SparkConf().setAppName("Tester").setMaster(master)
    val sc = new StreamingContext(conf, Milliseconds(110000))

    val stream = sc.receiverStream(new MyReceiver("localhost", 9999))

    val parsedStream = parse(stream)

    parsedStream.foreachRDD(rdd => 
        println(rdd.first()+"\nRULE STARTS "+System.currentTimeMillis()))

    val result1 = parsedStream
       .filter(entry => entry.symbol.contains("walking") 
       && entry.symbol.contains("true") && entry.symbol.contains("id0"))
       .map(_.time)

    val result2 = parsedStream
       .filter(entry =>
        entry.symbol == "disappear" && entry.symbol.contains("id0"))
       .map(_.time)

    val result3 = result1
      .transformWith(result2, (rdd1, rdd2: RDD[Int]) => rdd1.subtract(rdd2))

    result3.foreachRDD(rdd => 
    println(rdd.first()+"\nRULE ENDS "+System.currentTimeMillis()))

   sc.start()
   sc.awaitTermination()
}

def parse(stream: DStream[String]) = {

    stream.flatMap { line =>
        val entries = line.split("assert").filter(entry => !entry.isEmpty)
        entries.map { tuple =>

            val pattern = """\s*[(](.+)[,]\s*([0-9]+)+\s*[)]\s*[)]\s*[,|\.]\s*""".r

            tuple match {
              case pattern(symbol, time) =>
              new Data(symbol, time.toInt)
            }
         }
    }
}

case class Data (symbol: String, time: Int)

I have a batch duration of 110.000 milliseconds in order to receive all the data in one batch. I believed that, even locally, the spark is very fast. In this case, it takes about 3.5sec to execute the rule (between "RULE STARTS" and "RULE ENDS"). Am I doing something wrong or this is the expected time? Any advise

1

There are 1 answers

0
ben jarman On

So i was using case matching in allot of my jobs and it killed performance, more than when i introduced a json parser. Also try tweaking the batch time on the StreamingContext. It made quite a bit of difference for me. Also how many local workers do you have?