I am trying to connect my spark application to ActiveMQ, but while running the application I am getting this error:
[error] at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.checkIsOpen(MqttDefaultFilePersistence.java:130)
My build.sbt
is:
name := "sparkTest"
version := "0.1"
scalaVersion := "2.11.11"
//For spark
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.0" ,
"org.apache.spark" %% "spark-mllib" % "2.4.0" ,
"org.apache.spark" %% "spark-sql" % "2.4.0" ,
"org.apache.spark" %% "spark-hive" % "2.4.0" ,
"org.apache.spark" %% "spark-streaming" % "2.4.0" ,
"org.apache.spark" %% "spark-graphx" % "2.4.0",
// "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3",
// "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0",
)
//Bahir
libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.3.0"
and my application is:
import org.apache.spark.sql.SparkSession
object prog
{
val spark: SparkSession = SparkSession
.builder()
.appName("CSVReader")
.master("local[*]")
.getOrCreate()
// spark.sparkContext.setLogLevel("OFF")
import spark.implicits._
def main(args : Array[String]): Unit =
{
val df = spark
.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("brokerUrl","tcp://localhost:1883")
.option("topic","sample_topic")
.load()
df.printSchema()
}
}
When I switch to spark2.3
I do not get this error, but I have some code that makes specific use of spark2.4
. What should I do?
EDIT: I am adding the full Stack trace which I get by doing sbt run
[error] (run-main-0) MqttException (0)
[error] MqttException (0)
[error] at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.checkIsOpen(MqttDefaultFilePersistence.java:130)
[error] at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.getFiles(MqttDefaultFilePersistence.java:247)
[error] at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.close(MqttDefaultFilePersistence.java:142)
[error] at org.apache.bahir.sql.streaming.mqtt.MQTTStreamSource.stop(MQTTStreamSource.scala:234)
[error] at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:190)
[error] at com.activeMQ.prog$.main(prog.scala:25)
[error] at com.activeMQ.prog.main(prog.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] at sbt.Run.invokeMain(Run.scala:93)
[error] at sbt.Run.run0(Run.scala:87)
[error] at sbt.Run.execute$1(Run.scala:65)
[error] at sbt.Run.$anonfun$run$4(Run.scala:77)
[error] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
[error] at sbt.util.InterfaceUtil$$anon$1.get(InterfaceUtil.scala:10)
[error] at sbt.TrapExit$App.run(TrapExit.scala:252)
[error] at java.lang.Thread.run(Thread.java:748)
20/12/11 16:05:27 ERROR Utils: uncaught error in thread spark-listener-group-streams, stopping SparkContext
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:88)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR Utils: uncaught error in thread spark-listener-group-executorManagement, stopping SparkContext
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR ContextCleaner: Error in cleaning thread
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:181)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
20/12/11 16:05:27 ERROR Utils: uncaught error in thread spark-listener-group-shared, stopping SparkContext
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:88)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-streams
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:88)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-shared
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:88)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-appStatus
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-executorManagement
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/12/11 16:05:27 INFO SparkContext: SparkContext already stopped.
20/12/11 16:05:27 INFO SparkContext: SparkContext already stopped.
20/12/11 16:05:27 INFO SparkContext: SparkContext already stopped.
20/12/11 16:05:27 INFO SparkUI: Stopped Spark web UI at http://anonymouspirates:4040
[error] java.lang.RuntimeException: Nonzero exit code: 1
[error] at sbt.Run$.executeTrapExit(Run.scala:124)
[error] at sbt.Run.run(Run.scala:77)
[error] at sbt.Defaults$.$anonfun$bgRunTask$5(Defaults.scala:1185)
[error] at sbt.Defaults$.$anonfun$bgRunTask$5$adapted(Defaults.scala:1180)
[error] at sbt.internal.BackgroundThreadPool.$anonfun$run$1(DefaultBackgroundJobService.scala:366)
[error] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
[error] at scala.util.Try$.apply(Try.scala:209)
[error] at sbt.internal.BackgroundThreadPool$BackgroundRunnable.run(DefaultBackgroundJobService.scala:289)
[error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error] at java.lang.Thread.run(Thread.java:748)
[error] (Compile / run) Nonzero exit code: 1
[error] Total time: 10 s, completed 11 Dec, 2020 4:05:27 PM
20/12/11 16:05:27 INFO DiskBlockManager: Shutdown hook called
20/12/11 16:05:27 INFO ShutdownHookManager: Shutdown hook called
20/12/11 16:05:27 INFO ShutdownHookManager: Deleting directory /tmp/temporaryReader-0ab85d42-5e8b-4a06-95a9-585a205c1e27
20/12/11 16:05:27 INFO ShutdownHookManager: Deleting directory /tmp/spark-ead38fea-117d-4088-b52d-ffe21fd69184/userFiles-01d00f34-115e-4723-a0b7-66514824d43d
20/12/11 16:05:27 INFO ShutdownHookManager: Deleting directory /tmp/spark-ead38fea-117d-4088-b52d-ffe21fd69184
I am not sure this can actually solve the problem, but you can try to explicitly set the option localStorage in your code...
besides if you really need reliability or fault-tolerance you could also disable file persistence using the option