I'm using nats-spark-connector ( https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced ) to connect to NATS Jetstream and consume the message and process using Spark Java code. Below is the code snippet
private static void sparkNatsTester() {
SparkSession spark = SparkSession.builder()
.appName("spark-with-nats")
.master("local")
.config("spark.jars",
"libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
)
.config("spark.sql.streaming.checkpointLocation","tmp/checkpoint")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate();
System.out.println("sparkSession : "+ spark);
Dataset<Row> df = spark.readStream()
.format("nats")
.option("nats.host", "localhost")
.option("nats.port", 4222)
.option("nats.stream.name", "newstream")
.option("nats.stream.subjects", "newsub")
.option("nats.durable.name", "cons1")
.option("nats.msg.ack.wait.secs", 120)
.load();
StreamingQuery query;
try {
query = df.withColumn("date_only", from_unixtime(unix_timestamp(col("dateTime"), "MM/dd/yyyy - HH:mm:ss Z"), "MM/dd/yyyy"))
.writeStream()
.outputMode("append")
.partitionBy("date_only")
.format("delta")
.option("path", "tmp/outputdelta")
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
For testing From NATS CLI, running bellow command to push 10k messages
nats pub newsub --count=10000 "test #{{Count}}"
Above spark java code consumes all 10k messages and processes it and save them in output folder successfully .
What's the issue : While spark application is processing the messages, stop the spark application in between and after sometime restart the spark application. Once processing is complete if I count number of processed messages in the output directory it's more than 10k ! In above scenario output messages = 10100 where as only 10000 messages were pushed to NATS Jetstream as input. 100 messages were duplicated !
Observations : As I can see in the tmp\outputdelta_delta_log folder , the last file which got generated before stopping the application contains below :
{"commitInfo":{"timestamp":1701081648739,"operation":"STREAMING UPDATE","operationParameters":{"outputMode":"Append","queryId":"193fec24-78ea-4ac0-87af-7b6232e748ff","epochId":"20"},"readVersion":19,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numRemovedFiles":"0","numOutputRows":"100","numOutputBytes":"1753","numAddedFiles":"1"},"engineInfo":"Apache-Spark/3.3.3 Delta-Lake/2.3.0","txnId":"36a435e7-f775-4f80-a637-48a40c23dc87"}}
{"txn":{"appId":"193fec24-78ea-4ac0-87af-7b6232e748ff","version":20,"lastUpdated":1701081648739}}
{"add":{"path":"date_only=11%252F27%252F2023/part-00000-5c9955af-06c7-4da6-991a-2412f033cd38.c000.snappy.parquet","partitionValues":{"date_only":"11/27/2023"},"size":1753,"modificationTime":1701081648739,"dataChange":true,"stats":"{\**"numRecords\":100**,\"minValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"test #3001\"},\"maxValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"test #3100\"},\"nullCount\":{\"subject\":0,\"dateTime\":0,\"content\":0}}"}}
And the first file which got generated right after restarting the spark application contains below :
{"commitInfo":{"timestamp":1701081772472,"operation":"STREAMING UPDATE","operationParameters":{"outputMode":"Append","queryId":"f8d338e3-9eb2-49a2-96df-2a4c0e368285","epochId":"10"},"readVersion":30,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numRemovedFiles":"0","numOutputRows":"1000","numOutputBytes":"6250","numAddedFiles":"1"},"engineInfo":"Apache-Spark/3.3.3 Delta-Lake/2.3.0","txnId":"d1a5e794-6827-4c3d-bfa0-e1fd9e3474c9"}}
{"txn":{"appId":"f8d338e3-9eb2-49a2-96df-2a4c0e368285","version":10,"lastUpdated":1701081772472}}
{"add":{"path":"date_only=11%252F27%252F2023/part-00000-8e45d89b-6426-43f5-8144-ee826874dcfb.c000.snappy.parquet","partitionValues":{"date_only":"11/27/2023"},"size":6250,"modificationTime":1701081772455,"dataChange":true,"stats":"{\"**numRecords\":1000**,\"minValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"test #3001\"},\"maxValues\":{\"subject\":\"newsub\",\"dateTime\":\"11/27/2023 - 16:10:16 +0530\",\"content\":\"test #4000\"},\"nullCount\":{\"subject\":0,\"dateTime\":0,\"content\":0}}"}}
As you can see, before application was stopped, it had processed 3100 messages. But after spark application was restarted , it again processed from 3001 (test #3001) instead of 3100. That's where we see 100 messages were duplicated in the output folder. That means last batch which was processed just before application was shutdown(3001 to 3100) got reprocessed when application was restarted . Also I'm using Durable consumer which resends the messages if acknowledgment wasn't done after consumption. So looks like for the last microbatch which was processed just before app shutdown ( 3000 to 3100 ) acknowledgment wasn't done so it resent those messages again which got duplicated How to fix this issue ?