Spark SQL + Streaming issues

508 views Asked by At

We are trying to implement a use case using Spark Streaming and Spark SQL that allows us to run user-defined rules against some data (See below for how the data is captured and used). The idea is to use SQL to specify the rules and return the results as alerts to the users. Executing the query based on each incoming event batch seems to be very slow. Would appreciate if anyone can suggest a better approach to implementing this use case. Also, would like know if Spark is executing the sql on the driver or workers? Thanks in advance. Given below are the steps we perform in order to achieve this -

1) Load the initial dataset from an external database as a JDBCRDD

JDBCRDD<SomeState> initialRDD = JDBCRDD.create(...);

2) Create an incoming DStream (that captures updates to the initialized data)

JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
            FlumeUtils.createStream(ssc, flumeAgentHost, flumeAgentPort);

JavaDStream<SomeState> incomingDStream = flumeStream.map(...);

3) Create a Pair DStream using the incoming DStream

JavaPairDStream<Object,SomeState> pairDStream =
            incomingDStream.map(...);

4) Create a Stateful DStream from the pair DStream using the initialized RDD as the base state

JavaPairDStream<Object,SomeState> statefulDStream = pairDStream.updateStateByKey(...);

JavaRDD<SomeState> updatedStateRDD = statefulDStream.map(...);

5) Run a user-driven query against the updated state based on the values in the incoming stream

incomingStream.foreachRDD(new Function<JavaRDD<SomeState>,Void>() {

            @Override
            public Void call(JavaRDD<SomeState> events) throws Exception { 

                updatedStateRDD.count();
                SQLContext sqx = new SQLContext(events.context());
                schemaDf = sqx.createDataFrame(updatedStateRDD, SomeState.class);
                schemaDf.registerTempTable("TEMP_TABLE");
                sqx.sql(SELECT col1 from TEMP_TABLE where <condition1> and <condition2> ...);

                //collect the results and process and send alerts
                ...

            }
);
2

There are 2 answers

0
Sumit On

The first step should be to identify which step is taking most of the time. Please see the Spark Master UI and identify which Step/ Phase is taking most of the time.

There are few best practices + my observations which you can consider: -

  1. Use Singleton SQLContext - See example - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
  2. updateStateByKey can be a memory intensive operation in case of large number of keys. You need to check size of data processed by updateStateByKey function and also if it fits well in the given memory.
  3. How is your GC behaving?
  4. Are you really using "initialRDD"? if not then do not load it. In case it is static dataset then cache it.
  5. Check the time taken by your SQL Query too.

Here are few more questions/ areas which can help you

  1. What is the StorageLevel for DStreams?
  2. Size of cluster and configuration of Cluster
  3. version of Spark?

Lastly - ForEachRDD is an Output Operation which executes the given function on the Driver but RDD might actions and those actions are executed on worker nodes.

You may need to read this for better explaination about Output Operations - http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

0
Prashant Agrawal On

I too facing the same issue could you please let me know if you have got the solution for the same? Though I have mentioned the detailed use case in below post.

Spark SQL + Window + Streming Issue - Spark SQL query is taking long to execute when running with spark streaming