I am very new to Kafka. I am creating two topics and publishing on these two topics from two Producers. I have one consumer which consumes the messages from both the topics. This is because I want to process according to the priority.
I am getting a stream from both the topics but as soon as I start iterating on ConsumerItreator
of any stream, it blocks there. As it's written in documentation, it will be blocked till it gets a new message.
Is any one aware of how to read from two topics and two streams from a single Kafka Consumer?
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(KafkaConstants.HIGH_TEST_TOPIC, new Integer(1));
topicCountMap.put(KafkaConstants.LOW_TEST_TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> highPriorityStream = consumerMap.get(KafkaConstants.HIGH_TEST_TOPIC).get(0);
ConsumerIterator<byte[], byte[]> highPrioerityIterator = highPriorityStream.iterator();
while (highPriorityStream.nonEmpty() && highPrioerityIterator.hasNext())
{
byte[] bytes = highPrioerityIterator.next().message();
Object obj = null;
CLoudDataObject thunderDataObject = null;
try
{
obj = SerializationUtils.deserialize(bytes);
if (obj instanceof CLoudDataObject)
{
thunderDataObject = (CLoudDataObject) obj;
System.out.println(thunderDataObject);
// TODO Got the Thunder object here, now write code to send it to Thunder service.
}
}
catch (Exception e)
{
}
}
If you don't want to process lower priority messages before high priority ones, how about setting consumer.timeout.ms property and catch ConsumerTimeoutException to detect that the flows for high priority reach the last message available? By default it's set -1 to block until a new message arrives. (http://kafka.apache.org/07/configuration.html)
The below explains a way to process multiple flows concurrently with different priorities.
Kafka requires multi-thread programming. In your case, the streams of the two topics need to be processed by threads for the flows. Because each thread will run independently to process messages, one blocking flow (thread) won't affect other flows.
Java's ThreadPool implementation can help the job in creating multi-thread application. You can find example implementation here:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
Regarding the priority of execution, you can call Thread.currentThread.setPriority method to have the proper priorities of threads based on their serving Kafka topic.