We are trying to implement a use case using Spark Streaming and Spark SQL that allows us to run user-defined rules against some data (See below for how the data is captured and used). The idea is to use SQL to specify the rules and return the results as alerts to the users. Executing the query based on each incoming event batch seems to be very slow. Would appreciate if anyone can suggest a better approach to implementing this use case. Also, would like know if Spark is executing the sql on the driver or workers? Thanks in advance. Given below are the steps we perform in order to achieve this -
1) Load the initial dataset from an external database as a JDBCRDD
JDBCRDD<SomeState> initialRDD = JDBCRDD.create(...);
2) Create an incoming DStream (that captures updates to the initialized data)
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
FlumeUtils.createStream(ssc, flumeAgentHost, flumeAgentPort);
JavaDStream<SomeState> incomingDStream = flumeStream.map(...);
3) Create a Pair DStream using the incoming DStream
JavaPairDStream<Object,SomeState> pairDStream =
incomingDStream.map(...);
4) Create a Stateful DStream from the pair DStream using the initialized RDD as the base state
JavaPairDStream<Object,SomeState> statefulDStream = pairDStream.updateStateByKey(...);
JavaRDD<SomeState> updatedStateRDD = statefulDStream.map(...);
5) Run a user-driven query against the updated state based on the values in the incoming stream
incomingStream.foreachRDD(new Function<JavaRDD<SomeState>,Void>() {
@Override
public Void call(JavaRDD<SomeState> events) throws Exception {
updatedStateRDD.count();
SQLContext sqx = new SQLContext(events.context());
schemaDf = sqx.createDataFrame(updatedStateRDD, SomeState.class);
schemaDf.registerTempTable("TEMP_TABLE");
sqx.sql(SELECT col1 from TEMP_TABLE where <condition1> and <condition2> ...);
//collect the results and process and send alerts
...
}
);
The first step should be to identify which step is taking most of the time. Please see the Spark Master UI and identify which Step/ Phase is taking most of the time.
There are few best practices + my observations which you can consider: -
Here are few more questions/ areas which can help you
Lastly - ForEachRDD is an Output Operation which executes the given function on the Driver but RDD might actions and those actions are executed on worker nodes.
You may need to read this for better explaination about Output Operations - http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams