I'm using nats-spark-connector ( https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced ) to connect to NATS Jetstream and consume the message and process using Spark Java code. Below is the code snippet
private static void sparkNatsTester() {
SparkSession spark = SparkSession.builder()
.appName("spark-with-nats")
.master("local")
// .config("spark.logConf", "false")
.config("spark.jars",
"libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
)
// .config("spark.executor.instances", "2")
// .config("spark.cores.max", "4")
// .config("spark.executor.memory", "2g")
.getOrCreate();
System.out.println("sparkSession : "+ spark);
Dataset<Row> df = spark.readStream()
.format("nats")
.option("nats.host", "localhost")
.option("nats.port", 4222)
.option("nats.stream.name", "my_stream")
.option("nats.stream.subjects", "my_sub")
// wait 90 seconds for an ack before resending a message
.option("nats.msg.ack.wait.secs", 1)
//.option("nats.num.listeners", 2)
// Each listener will fetch 10 messages at a time
// .option("nats.msg.fetch.batch.size", 10)
.load();
System.out.println("Successfully read nats stream !");
StreamingQuery query;
try {
query = df.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
It successfully prints spark session object and "Successfully read nats stream ! " . After that it prints below two lines and then gives exception
Successfully read nats stream !
Status change nats: connection opened
Status change nats: connection closed
Exception is Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
Full exception below :
Exception in thread "stream execution thread for [id = 3ac2d1ac-4876-4c2a-a501-9f94e7e11300, runId = f72897c4-180d-4272-abe2-df9f3838e54b]" org.apache.spark.sql.streaming.StreamingQueryException: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
=== Streaming Query ===
Identifier: [id = 3ac2d1ac-4876-4c2a-a501-9f94e7e11300, runId = f72897c4-180d-4272-abe2-df9f3838e54b]
Current Committed Offsets: {}
Current Available Offsets: {}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource org.apache.spark.sql.execution.streaming.ConsoleTable$@16cfe41b, 3ac2d1ac-4876-4c2a-a501-9f94e7e11300, Append
+- StreamingExecutionRelation natsconnector.spark.NatsStreamingSource@4b0f9a63, [subject#3, dateTime#4, content#5]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:548)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1915)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1911)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1917)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1876)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1835)
at org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.list(CheckpointFileManager.scala:315)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.listBatches(HDFSMetadataLog.scala:327)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:265)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:253)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:289)
... 4 more
pom.xml snippet below for spark related dependencies :
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<spark.version>3.5.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
I'm setting below property before executing above method without which it was giving an error :
System.setProperty("hadoop.home.dir", "C:\\Program Files\\Hadoop\\winutils-master\\hadoop-3.3.1\\");
And also I've set HADOOP_HOME environment variable and added it's bin folder to Path as per previous questions suggestions.
In the maven dependencies I can see hadoop related jar version is 3.3.4. I tried to match its version with winutils-master hadoop version but still getting same error. Can you please let me know how to fix this error.
After adding HADOOP_HOME environment variable and added it's bin folder to Path I hadn't restarted my system. After restarting it, above error was gone ! However I was getting another error
java.lang.NoSuchMethodError
which was related to spark version mismatch which got resolved after I changed spark version to<spark.version>3.3.3</spark.version>
Finally was able to fetch message from Nats JetStream into my spark code !