Spark Streaming output mode to process only new messages

92 views Asked by At

I'm using nats-spark-connector ( https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced ) to connect to NATS Jetstream and consume the message and process using Spark Java code. Below is the code snippet

private static void sparkNatsTester() {
        SparkSession spark = SparkSession.builder()
                .appName("spark-with-nats")
                .master("local")
//              .config("spark.logConf", "false")
                  .config("spark.jars",
                  "libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
                  )
//                .config("spark.executor.instances", "2")
//                .config("spark.cores.max", "4")
//                .config("spark.executor.memory", "2g")
                  .getOrCreate();
        System.out.println("sparkSession : "+ spark);
        Dataset<Row> df = spark.readStream()
                .format("nats")
                .option("nats.host", "localhost")
                .option("nats.port", 4222)
                .option("nats.stream.name", "my_stream3")
                .option("nats.stream.subjects", "mysub3")
                // wait 90 seconds for an ack before resending a message
                .option("nats.msg.ack.wait.secs", 90)
                //.option("nats.num.listeners", 2)
                // Each listener will fetch 10 messages at a time
               // .option("nats.msg.fetch.batch.size", 10)
                .load();
        System.out.println("Successfully read nats stream !");
        
        StreamingQuery query;
        try {
            query = df.writeStream()
                      .outputMode("append")
                      .format("console")
                      .option("truncate", false)
                      .start();
            query.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }

As per nats-io jetstream guide ( https://docs.nats.io/nats-concepts/jetstream/js_walkthrough) using below command to publish messages to the stream (subject name : mysub3)

nats pub foo --count=1000 --sleep 1s "publication #{{Count}} @ {{TimeStamp}}" 

After publishing the messages to nats stream, output of the code is :

Successfully read nats stream !
Status change nats: connection opened
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+--------+-------+
|subject|dateTime|content|
+-------+--------+-------+
+-------+--------+-------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+--------+-------+
|subject|dateTime|content|
+-------+--------+-------+
+-------+--------+-------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime                   |content                                   |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
+-------+---------------------------+------------------------------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime                   |content                                   |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
+-------+---------------------------+------------------------------------------+

-------------------------------------------
Batch: 4
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime                   |content                                   |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
|mysub3 |10/31/2023 - 18:04:34 +0530|publication #4 @ 2023-10-31T18:04:34+05:30|
+-------+---------------------------+------------------------------------------+

-------------------------------------------
Batch: 5
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime                   |content                                   |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
|mysub3 |10/31/2023 - 18:04:34 +0530|publication #4 @ 2023-10-31T18:04:34+05:30|
|mysub3 |10/31/2023 - 18:04:35 +0530|publication #5 @ 2023-10-31T18:04:35+05:30|
+-------+---------------------------+------------------------------------------+

-------------------------------------------
Batch: 6
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime                   |content                                   |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
|mysub3 |10/31/2023 - 18:04:34 +0530|publication #4 @ 2023-10-31T18:04:34+05:30|
|mysub3 |10/31/2023 - 18:04:35 +0530|publication #5 @ 2023-10-31T18:04:35+05:30|
|mysub3 |10/31/2023 - 18:04:36 +0530|publication #6 @ 2023-10-31T18:04:36+05:30|
|mysub3 |10/31/2023 - 18:04:37 +0530|publication #7 @ 2023-10-31T18:04:37+05:30|
+-------+---------------------------+------------------------------------------+

And it continues to print the same set of messages for every batch. Since I'm using outputMode("append") I was expecting only newly published messages should get printed for every batch. But all messages including messages which got printed in previous batch is included in the subsequent batches . I tried with outputMode("update") as well. It's giving same output as append. Can you please help me how to make sure each batch prints/receives only newly pushed messages ?

1

There are 1 answers

0
VGH On BEST ANSWER

Found out a solution for this scenario ! Messages were getting duplicated in every batches because there was no acknowledgment to NATS for the messages after consuming it. Had to use a durable consumer and pass the consumer name as an option in the spark code.

Instructions to create a durable consumer : https://docs.nats.io/nats-concepts/jetstream/js_walkthrough#3.-creating-a-consumer

Info related to durable consumer : https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced

"nats.durable.name" Durable subscriptions allow clients to assign a durable name to a subscription when it is created. Doing this causes the NATS Streaming server to track the last acknowledged message for that clientID + durable name, so that only messages since the last acknowledged message will be delivered to the client. Obligatory configuration.

Pass this consumer name as an option by using key "nats.durable.name":

Dataset<Row> df = spark.readStream()
                .format("nats")
                .option("nats.host", "localhost")
                .option("nats.port", 4222)
                .option("nats.stream.name", "my_stream3")
                .option("nats.stream.subjects", "mysub3")
                // wait 90 seconds for an ack before resending a message
                .option("nats.msg.ack.wait.secs", 90)
                //.option("nats.num.listeners", 2)
                // Each listener will fetch 10 messages at a time
               // .option("nats.msg.fetch.batch.size", 10)
                  .option("nats.durable.name", "my_consumer")
                .load();

This will make sure that only messages since the last acknowledged message will be delivered to the client.