MqttException while connecting to ActiveMq via SparkStructedStreaming

322 views Asked by At

I am trying to connect my spark application to ActiveMQ, but while running the application I am getting this error:

[error]  at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.checkIsOpen(MqttDefaultFilePersistence.java:130)

My build.sbt is:

name := "sparkTest"

version := "0.1"

scalaVersion := "2.11.11"

//For spark
libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "2.4.0" ,
    "org.apache.spark" %% "spark-mllib" % "2.4.0" ,
    "org.apache.spark" %% "spark-sql" % "2.4.0" ,
    "org.apache.spark" %% "spark-hive" % "2.4.0" ,
    "org.apache.spark" %% "spark-streaming" % "2.4.0" ,
    "org.apache.spark" %% "spark-graphx" % "2.4.0",
//    "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3",
//    "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0",
)

//Bahir
libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.3.0"

and my application is:


import org.apache.spark.sql.SparkSession

object prog
{
    val spark: SparkSession = SparkSession
        .builder()
        .appName("CSVReader")
        .master("local[*]")
        .getOrCreate()
    
//    spark.sparkContext.setLogLevel("OFF")
    import spark.implicits._
    
    def main(args : Array[String]): Unit =
    {
        val df = spark
            .readStream
            .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
            .option("brokerUrl","tcp://localhost:1883")
            .option("topic","sample_topic")
            .load()
        
        df.printSchema()
    }
}

When I switch to spark2.3 I do not get this error, but I have some code that makes specific use of spark2.4. What should I do?

EDIT: I am adding the full Stack trace which I get by doing sbt run

[error] (run-main-0) MqttException (0)
[error] MqttException (0)
[error]         at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.checkIsOpen(MqttDefaultFilePersistence.java:130)
[error]         at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.getFiles(MqttDefaultFilePersistence.java:247)
[error]         at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.close(MqttDefaultFilePersistence.java:142)
[error]         at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSource.stop(MQTTStreamSource.scala:234)
[error]         at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:190)
[error]         at com.activeMQ.prog$.main(prog.scala:25)
[error]         at com.activeMQ.prog.main(prog.scala)
[error]         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error]         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error]         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error]         at java.lang.reflect.Method.invoke(Method.java:498)
[error]         at sbt.Run.invokeMain(Run.scala:93)
[error]         at sbt.Run.run0(Run.scala:87)
[error]         at sbt.Run.execute$1(Run.scala:65)
[error]         at sbt.Run.$anonfun$run$4(Run.scala:77)
[error]         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
[error]         at sbt.util.InterfaceUtil$$anon$1.get(InterfaceUtil.scala:10)
[error]         at sbt.TrapExit$App.run(TrapExit.scala:252)
[error]         at java.lang.Thread.run(Thread.java:748)
20/12/11 16:05:27 ERROR Utils: uncaught error in thread spark-listener-group-streams, stopping SparkContext
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:88)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR Utils: uncaught error in thread spark-listener-group-executorManagement, stopping SparkContext
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR ContextCleaner: Error in cleaning thread
java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
        at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:181)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
        at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
        at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
20/12/11 16:05:27 ERROR Utils: uncaught error in thread spark-listener-group-shared, stopping SparkContext
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:88)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-streams
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:88)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-shared
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:88)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-appStatus
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-executorManagement
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 INFO SparkContext: SparkContext already stopped.
20/12/11 16:05:27 INFO SparkContext: SparkContext already stopped.
20/12/11 16:05:27 INFO SparkContext: SparkContext already stopped.
20/12/11 16:05:27 INFO SparkUI: Stopped Spark web UI at http://anonymouspirates:4040
[error] java.lang.RuntimeException: Nonzero exit code: 1
[error]         at sbt.Run$.executeTrapExit(Run.scala:124)
[error]         at sbt.Run.run(Run.scala:77)
[error]         at sbt.Defaults$.$anonfun$bgRunTask$5(Defaults.scala:1185)
[error]         at sbt.Defaults$.$anonfun$bgRunTask$5$adapted(Defaults.scala:1180)
[error]         at sbt.internal.BackgroundThreadPool.$anonfun$run$1(DefaultBackgroundJobService.scala:366)
[error]         at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
[error]         at scala.util.Try$.apply(Try.scala:209)
[error]         at sbt.internal.BackgroundThreadPool$BackgroundRunnable.run(DefaultBackgroundJobService.scala:289)
[error]         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error]         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error]         at java.lang.Thread.run(Thread.java:748)
[error] (Compile / run) Nonzero exit code: 1
[error] Total time: 10 s, completed 11 Dec, 2020 4:05:27 PM
20/12/11 16:05:27 INFO DiskBlockManager: Shutdown hook called
20/12/11 16:05:27 INFO ShutdownHookManager: Shutdown hook called
20/12/11 16:05:27 INFO ShutdownHookManager: Deleting directory /tmp/temporaryReader-0ab85d42-5e8b-4a06-95a9-585a205c1e27
20/12/11 16:05:27 INFO ShutdownHookManager: Deleting directory /tmp/spark-ead38fea-117d-4088-b52d-ffe21fd69184/userFiles-01d00f34-115e-4723-a0b7-66514824d43d
20/12/11 16:05:27 INFO ShutdownHookManager: Deleting directory /tmp/spark-ead38fea-117d-4088-b52d-ffe21fd69184
1

There are 1 answers

0
gtosto On

I am not sure this can actually solve the problem, but you can try to explicitly set the option localStorage in your code...

val df = spark
   .readStream
   .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
   .option("brokerUrl","tcp://localhost:1883")
   .option("topic","sample_topic")
   .option("localStorage", "/path/to/localdir")
   .load()

besides if you really need reliability or fault-tolerance you could also disable file persistence using the option

.option("persistence", "memory")