I am trying to use spark streaming to read the data from a kafka topic.
The message from kafka is a JSON which i am storing below in the value column of the dataset as String.
Sample message : Just a sample, actual json is complex
{
"Name": "Bauddhik",
"Profession": "Developer"
}
Dataset<Row> df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(value AS STRING)");
Now as my Dataset have a value column with the entire JSON, I need to pick one of the field which i can use as a key while storing in Redis. Suppose the field is "Name" from the json.
So, first i did below select to take out the "name" field as a new column in my dataframe.
Dataset<Row> df1 = df.select(functions.col("value"), functions.get_json_object(functions.col("value"), "$['name']").as("name");
This works fine and now my df2 looks like
Value | name
<Json> | Bauddhik
Now i want this to be inserted to Redis cache with the key as 'Bauddhik' and the value as the entire Json. So i am using below foreachbatch option to persist in Redis.
df1.writeStream().foreachbatch (
new VoidFunction2<Dataset<Row>, Long>() {
public void call (Dataset<Row> dataset, Long batchId) {
dataset.write()
.format("org.apache.spark.sql.redis")
.option("key.coloum", **<hereistheissue>**)
.option("table","test")
.mode(SaveMode.Overwrite)
.save();
}
}).start()
If you look at the above code (hereistheissue) , I need to paas the key as Bauddhik which i derived earlier as a seperate column in the Dataframe.
I am not able to retrieve the name column as string so i can pass it to the Redis cache as the key. I have tried using map and df.head().getString(1)
but nothing seems to be working.
Can anyone please guide on how I can read a column from a dataset as a String and pass to the key option while writing to Redis cache.
you have misspelled column in your code...
should be..