How to send data from spark-structured-streaming to a topic of ActiveMQ

419 views Asked by At

I am a beginner in Spark and want to send a dataframe from my spark-structured-streaming application to a defined topic in ActiveMQ. How can I do this?

EDIT: My versions: activeMQ-5.16, spark-2.4.0, bahir-2.4.0, scala-2.11.11

Currently, I am able to read data from one topic and send it to the other:

    val df = spark
        .readStream
        .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
        .option("brokerUrl","tcp://localhost:1883")
        .option("topic","sample_topic")
        .option("persistence","memory")
        .option("cleanSession", "true")
        .load()
        .select("payload")
        .as[Array[Byte]]
        .map(payload => new String(payload))
        .toDF("payload")
    
    val checkpointLocation : String = "/home/wintersoldier/Desktop/dump/checkpoint"
  
    val df2 = df
        .writeStream
        .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider")
        .option("brokerUrl","tcp://localhost:1883")
        .option("topic","sample_topic4")
        .option("checkpointLocation", checkpointLocation)
        .start
    df2.awaitTermination()

But this only for testing purpose, I want to send a custom string to the ActiveMQ topic, the dataframe schema is id : integer, topic: String, payload : binary, timestamp : timestamp so how can I convert my String to payload of binary format and send it?

0

There are 0 answers