This question is a follow-up to State management not serializable.
I want to encapsulate state management logic.
The following represents where I am at right now:
class StateManager(
stream: DStream[(String, String)],
updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)]
) {
lazy val myState = stream.mapWithState(stateSpec).map(_.get)
lazy val stateSpec = StateSpec.function(updateStateFunction)
}
object StateManager {
def apply(
_dStream: DStream[(String, String)],
_updateState: (String, Option[String], State[String]) => Option[(String, String)]
) =
new StateManager(dStream, updateState)
}
This works fine, but only allows DStream[(String,String)]
to be handled, which is a first step towards generic state management, fit to welcome any DStream
: from DStream[(Int,String)]
to DStream[(String,myCustomClass)]
.
myState
requires to be a value function in order to work (serialization
).
But I face a problem as type parameter
s don't apply to function objects in scala.
user6910411 gave me a hint by using ClassTag
s with an enclosing method (Type-parameterize a DStream), but in turn it'd still be a method.
Would anyone have some intel on how to overcome those difficulties?
The context:
Spark 1.6
Spark Graph:
object Consumer_Orchestrator {
def main(args: Array[String]) = {
//setup configurations
val streamingContext = StreamingEnvironment(/*configurations*/)
val kafkaStream = streamingContext.stream()
val updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)] = (key, value, state) => {/*some code*/}
val initialState = emptyRDD
val stateManager = StateManager(kafkaStream, updateState)
val state: DStream[(String, String)] = stateManager.myState
state.foreachRDD(_.foreach(println))
myStreamingContext.start()
myStreamingContext.awaitTermination()
}
}
The StreamingEnvironment
class to create the Streaming
:
class StreamingEnvironment(sparkConf: SparkConf, kafkaConf: KafkaConf) {
val sparkContext = spark.SparkContext.getOrCreate(sparkConf)
lazy val streamingContext = new StreamingContext(sparkContext, Seconds(30))
mStreamingContext.checkpoint(/*directory checkpoint*/)
mStreamingContext.remember(Minutes(1))
def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
def stop() = sparkContext.stop()
}
object StreamingEnvironment {
def apply(kafkaConf: KafkaConf) = {
val sparkConf = new SparkConf
new StreamingEnvironment(sparkConf, kafkaConf)
}
}
Here you are:
App.scala
:StateManage.scala
:build.sbt
:Directory structure:
Example execution:
As you can see there is no magic here. If you introduce generic arguments you need
ClassTags
in the same context.