Generic state management

166 views Asked by At

This question is a follow-up to State management not serializable.


I want to encapsulate state management logic.

The following represents where I am at right now:

class StateManager(
  stream: DStream[(String, String)],
  updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)]
) {
  lazy val myState = stream.mapWithState(stateSpec).map(_.get)
  lazy val stateSpec = StateSpec.function(updateStateFunction)
}

object StateManager {
  def apply(
    _dStream: DStream[(String, String)],
    _updateState: (String, Option[String], State[String]) => Option[(String, String)]
  ) =
    new StateManager(dStream, updateState)
}

This works fine, but only allows DStream[(String,String)] to be handled, which is a first step towards generic state management, fit to welcome any DStream: from DStream[(Int,String)] to DStream[(String,myCustomClass)].

myState requires to be a value function in order to work (serialization).

But I face a problem as type parameters don't apply to function objects in scala.

user6910411 gave me a hint by using ClassTags with an enclosing method (Type-parameterize a DStream), but in turn it'd still be a method.

Would anyone have some intel on how to overcome those difficulties?


The context:

Spark 1.6

Spark Graph:

object Consumer_Orchestrator {
    def main(args: Array[String]) = {
        //setup configurations

        val streamingContext = StreamingEnvironment(/*configurations*/)

        val kafkaStream = streamingContext.stream()

        val updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)] = (key, value, state) => {/*some code*/}
        val initialState = emptyRDD

        val stateManager = StateManager(kafkaStream, updateState)
        val state: DStream[(String, String)] = stateManager.myState

        state.foreachRDD(_.foreach(println))

        myStreamingContext.start()
        myStreamingContext.awaitTermination()
    }
}

The StreamingEnvironment class to create the Streaming:

class StreamingEnvironment(sparkConf: SparkConf, kafkaConf: KafkaConf) {
    val sparkContext = spark.SparkContext.getOrCreate(sparkConf)
    lazy val streamingContext = new StreamingContext(sparkContext, Seconds(30))

    mStreamingContext.checkpoint(/*directory checkpoint*/)
    mStreamingContext.remember(Minutes(1))

    def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
    def stop() = sparkContext.stop()
}

object StreamingEnvironment {
    def apply(kafkaConf: KafkaConf) = {
    val sparkConf = new SparkConf

    new StreamingEnvironment(sparkConf, kafkaConf)
    }
}
1

There are 1 answers

0
zero323 On BEST ANSWER

Here you are:

  • App.scala:

    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.dstream.ConstantInputDStream
    import statemanager._
    
    object App {
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext("local[*]", "generic", new SparkConf())
        val ssc = new StreamingContext(sc, Seconds(10))
        ssc.checkpoint("/tmp/chk")
    
        StateManager(
          new ConstantInputDStream(ssc, sc.parallelize(Seq(("a", 1), ("b",2)))),
          (_: String, _: Option[Int], _: State[Int]) =>  Option(1)
        ).myState.print
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
  • StateManage.scala:

    package statemanager
    
    import scala.reflect.ClassTag
    import org.apache.spark.streaming.{State, StateSpec}
    import org.apache.spark.streaming.dstream.DStream
    
    class StateManager[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
      stream: DStream[(T, U)],
      updateStateFunction: (T, Option[U], State[V]) => Option[W]
    ) {
      lazy val myState = stream.mapWithState(stateSpec).map(_.get)
      lazy val stateSpec = StateSpec.function(updateStateFunction)
    }
    
    object StateManager {
      def apply[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
        _dStream: DStream[(T, U)],
        _updateState: (T, Option[U], State[V]) => Option[W]
      ) =
        new StateManager(_dStream, _updateState)
    }
    
  • build.sbt:

    scalaVersion := "2.11.8"
    
    val sparkVersion = "2.1.0"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % sparkVersion,
      "org.apache.spark" %% "spark-streaming" % sparkVersion
    )
    
  • Directory structure:

    ├── App.scala
    ├── build.sbt
    └── StateManage.scala
    
  • Example execution:

    sbt run
    ...
    -------------------------------------------
     Time: 1483701790000 ms
     -------------------------------------------
    1
    1
    ...
    

As you can see there is no magic here. If you introduce generic arguments you need ClassTags in the same context.