I am trying to use spark streaming to read the data from a kafka topic.

The message from kafka is a JSON which i am storing below in the value column of the dataset as String.

Sample message : Just a sample, actual json is complex

{
    "Name": "Bauddhik",
    "Profession": "Developer"
}
Dataset<Row> df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(value AS STRING)");

Now as my Dataset have a value column with the entire JSON, I need to pick one of the field which i can use as a key while storing in Redis. Suppose the field is "Name" from the json.

So, first i did below select to take out the "name" field as a new column in my dataframe.

Dataset<Row> df1 = df.select(functions.col("value"), functions.get_json_object(functions.col("value"), "$['name']").as("name");

This works fine and now my df2 looks like

Value         |       name
<Json>        |     Bauddhik

Now i want this to be inserted to Redis cache with the key as 'Bauddhik' and the value as the entire Json. So i am using below foreachbatch option to persist in Redis.

df1.writeStream().foreachbatch (
   new VoidFunction2<Dataset<Row>, Long>() {
      public void call (Dataset<Row> dataset, Long batchId) {
          dataset.write()
              .format("org.apache.spark.sql.redis")
              .option("key.coloum", **<hereistheissue>**)
              .option("table","test")
              .mode(SaveMode.Overwrite)
              .save();
    }
}).start()

If you look at the above code (hereistheissue) , I need to paas the key as Bauddhik which i derived earlier as a seperate column in the Dataframe.

I am not able to retrieve the name column as string so i can pass it to the Redis cache as the key. I have tried using map and df.head().getString(1) but nothing seems to be working.

Can anyone please guide on how I can read a column from a dataset as a String and pass to the key option while writing to Redis cache.

1

There are 1 answers

0
coleh On

you have misspelled column in your code...

.option("key.coloum", **<hereistheissue>**)

should be..

.option("key. Column", **<hereistheissue>**)