I am trying to integrate Kafka with a Heron Topology. However, I am not able to find any examples with the latest version of Heron (0.17.5). Is there any example that can be shared or any suggestions on how to implement a custom Kafka Spout and Kafka Bolt?
Edit 1:
I believe KafkaSpout and KafkaBolt were intentionally deprecated in Heron to give way for the new Streamlet API. I am currently to see if I can build a KafkaSource and KafkaSink using the Streamlet API. However, I am getting the below exception, when I try to create a KafkaConsumer within the Source.
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.consumer.KafkaConsumer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.twitter.heron.api.utils.Utils.serialize(Utils.java:97)
Edit 2:
Fixed the above issue. I was initializing the KafkaConsumer
in the Constructor which was wrong. Initializing the same in the setup()
method fixed it.
I managed to get this done using Streamlet API for Heron. I'm posting the same here. Hope it helps others facing the same problem.
Kafka Source