nats-spark-connector with Java giving an error

75 views Asked by At

I'm using nats-spark-connector ( https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced ) to connect to NATS Jetstream and consume the message and process using Spark Java code. Below is the code snippet

 private static void sparkNatsTester() {
        SparkSession spark = SparkSession.builder()
                .appName("spark-with-nats")
                .master("local")
//              .config("spark.logConf", "false")
                  .config("spark.jars",
                  "libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
                  )
//                .config("spark.executor.instances", "2")
//                .config("spark.cores.max", "4")
//                .config("spark.executor.memory", "2g")
                  .getOrCreate();
        System.out.println("sparkSession : "+ spark);
        Dataset<Row> df = spark.readStream()
                .format("nats")
                .option("nats.host", "localhost")
                .option("nats.port", 4222)
                .option("nats.stream.name", "my_stream")
                .option("nats.stream.subjects", "my_sub")
                // wait 90 seconds for an ack before resending a message
                .option("nats.msg.ack.wait.secs", 1)
                //.option("nats.num.listeners", 2)
                // Each listener will fetch 10 messages at a time
               // .option("nats.msg.fetch.batch.size", 10)
                .load();
        System.out.println("Successfully read nats stream !");
        
        StreamingQuery query;
        try {
            query = df.writeStream()
                      .outputMode("append")
                      .format("console")
                      .start();
            query.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } 
    } 

It successfully prints spark session object and "Successfully read nats stream ! " . After that it prints below two lines and then gives exception

Successfully read nats stream !
Status change nats: connection opened
Status change nats: connection closed

Exception is Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

Full exception below :

Exception in thread "stream execution thread for [id = 3ac2d1ac-4876-4c2a-a501-9f94e7e11300, runId = f72897c4-180d-4272-abe2-df9f3838e54b]" org.apache.spark.sql.streaming.StreamingQueryException: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
=== Streaming Query ===
Identifier: [id = 3ac2d1ac-4876-4c2a-a501-9f94e7e11300, runId = f72897c4-180d-4272-abe2-df9f3838e54b]
Current Committed Offsets: {}
Current Available Offsets: {}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource org.apache.spark.sql.execution.streaming.ConsoleTable$@16cfe41b, 3ac2d1ac-4876-4c2a-a501-9f94e7e11300, Append
+- StreamingExecutionRelation natsconnector.spark.NatsStreamingSource@4b0f9a63, [subject#3, dateTime#4, content#5]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
    at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
    at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
    at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
    at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:548)
    at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1915)
    at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1911)
    at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
    at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1917)
    at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1876)
    at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1835)
    at org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.list(CheckpointFileManager.scala:315)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.listBatches(HDFSMetadataLog.scala:327)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:265)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:253)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:289)
    ... 4 more

pom.xml snippet below for spark related dependencies :

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <spark.version>3.5.0</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

I'm setting below property before executing above method without which it was giving an error :

System.setProperty("hadoop.home.dir", "C:\\Program Files\\Hadoop\\winutils-master\\hadoop-3.3.1\\");

And also I've set HADOOP_HOME environment variable and added it's bin folder to Path as per previous questions suggestions.

In the maven dependencies I can see hadoop related jar version is 3.3.4. I tried to match its version with winutils-master hadoop version but still getting same error. Can you please let me know how to fix this error.

1

There are 1 answers

0
VGH On BEST ANSWER

After adding HADOOP_HOME environment variable and added it's bin folder to Path I hadn't restarted my system. After restarting it, above error was gone ! However I was getting another error java.lang.NoSuchMethodError which was related to spark version mismatch which got resolved after I changed spark version to <spark.version>3.3.3</spark.version> Finally was able to fetch message from Nats JetStream into my spark code !