I am a beginner in Spark and want to send a dataframe from my spark-structured-streaming application to a defined topic in ActiveMQ. How can I do this?
EDIT:
My versions: activeMQ-5.16
, spark-2.4.0
, bahir-2.4.0
, scala-2.11.11
Currently, I am able to read data from one topic and send it to the other:
val df = spark
.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("brokerUrl","tcp://localhost:1883")
.option("topic","sample_topic")
.option("persistence","memory")
.option("cleanSession", "true")
.load()
.select("payload")
.as[Array[Byte]]
.map(payload => new String(payload))
.toDF("payload")
val checkpointLocation : String = "/home/wintersoldier/Desktop/dump/checkpoint"
val df2 = df
.writeStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider")
.option("brokerUrl","tcp://localhost:1883")
.option("topic","sample_topic4")
.option("checkpointLocation", checkpointLocation)
.start
df2.awaitTermination()
But this only for testing purpose, I want to send a custom string to the ActiveMQ topic, the dataframe schema is id : integer, topic: String, payload : binary, timestamp : timestamp
so how can I convert my String to payload of binary
format and send it?