Spark Streaming - After application restart, pulled message count is not correct

36 views Asked by At

I'm using below Spark Java code to pull messages from NATS Jetstream

private static void sparkNatsTester() {
         
            SparkSession spark = SparkSession.builder()
                    .appName("spark-with-nats")
                    .master("local")
                      .config("spark.jars",
                      "libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
                      )
                     .config("spark.sql.streaming.checkpointLocation","tmp/checkpoint")
                    .config("fs.s3a.access.key", "minioadmin")
                    .config("fs.s3a.secret.key", "minioadmin")
                    .config("fs.s3a.endpoint", "http://127.0.0.1:9000")
                    .config("fs.s3a.connection.ssl.enabled", "true")
                    .config("fs.s3a.path.style.access", "true")
                    .config("fs.s3a.attempts.maximum", "1")
                    .config("fs.s3a.connection.establish.timeout", "5000")
                    .config("fs.s3a.connection.timeout", "10000")
                      .getOrCreate();
            Dataset<Row> df = spark.readStream()
                    .format("nats")
                    .option("nats.host", "localhost")
                    .option("nats.port", 4222)
                    .option("nats.stream.name", "newstream")
                    .option("nats.stream.subjects", "newsub")
                    .option("nats.durable.name", "newconsumer")
                    // wait 90 seconds for an ack before resending a message
                    .option("nats.msg.ack.wait.secs", 90)
                    .load();
            System.out.println("Successfully read nats stream !");
            df.createOrReplaceTempView("natsmessages");
            Dataset<Row> filteredDf = spark.sql("select * from natsmessages");
            
            StreamingQuery query;
            try {
                query = filteredDf.withColumn("date_only", from_unixtime(unix_timestamp(col("dateTime"),  "MM/dd/yyyy - HH:mm:ss Z"), "MM/dd/yyyy"))
                        .writeStream()
                          .outputMode("append")
                          .partitionBy("date_only")
                          .format("parquet")
                          .option("path", "tmp/newoutputtest")
                          .start();
                query.awaitTermination();
            } catch (Exception e) {
                e.printStackTrace();
            } 
        } 

For testing I'm pushing 10k messages to NATS Jetstream using NATS CLI nats pub newsub --count=10000 "test #{{Count}}" . While app is still processing these messages say after it processed 2k messages I shutdown the app ( app is running on my local system Eclipse ) and then I restarted the app. As I've added checkpoint location .config("spark.sql.streaming.checkpointLocation","tmp/checkpoint") I was expecting if app restarts , it will pick and processes pending messages from where it left off. But after app processes everything if I check the count of messages processed it's not exactly 10k. Sometimes processed message count is less than 10k whenever time between app shutdown and restart is more, and sometimes it's more than 10k whenever time between app shutdown and restart is very less !

Whatever the case maybe at the end it should have processed exactly 10k messages which isn't the case here. Can you please let me know how to solve this issue. Am I missing any other configurations ? newconsumer is a durable consumer created in NATS.

0

There are 0 answers