I have a case class in Scala like this
case class RemoteCopyGroup(
ts: Long,
systemId: String,
name: String,
id: Int,
role: String,
mode: String,
remoteGroupName: String)
object RemoteCopyGroup {
// to be removed
val arrayOfIds = Array("CZ210507H1", "CZ20030W4H", "CZ29400JBJ")
def randomSerialNumber = Random.shuffle(arrayOfIds.toList).head
def get(x: Rate): RemoteCopyGroup = {
RemoteCopyGroup(
x.timestamp.getTime / 1000,
randomSerialNumber,
Random.nextString(2),
Random.nextInt(3),
Random.nextString(2),
Random.nextString(2),
Random.nextString(2))
}
}
I am generating a stream of data using RateStreamSource like this
val remoteCopyGroupDS: Dataset[(String, RemoteCopyGroup)] = sparkSession
.readStream
.format("rate") // <-- use RateStreamSource
.option("rowsPerSecond", rate)
.load()
.as[Rate].filter(_.value % 10 == 0)
.map(RemoteCopyGroup.get).map(rcg => rcg.systemId -> rcg)
I want to do stateful operations on remoteCopyGroupDS but I am not able to use methods like mapWithState because remoteCopyGroupDS is not a DStream.
Is there a way I can generate a DStream that continuously emits data or I can convert current DataSet i.e. remoteCopyGroupDS to DStream ?
The KafkaRate is a stream of rate/price data that is continuously published by a Kafka Topic or a Kafka Topic Stream. In Spark, the DStream is a distributed stream of data that can be computed on the fly. DStreams represent a time series of data and are used to compute values for use in other Spark operations. DStreams can be created from a variety of sources including Kafka, Flume, HDFS, and many others. Spark provides a Receiver object to interface with Kafka sources. You can create Receiver objects from Kafka Sources using the created receiver method that is provided in the SparkContext.