How to use Consumer API of Kafka 0.8.2?

7.8k views Asked by At

I'm getting start with the latest Kafka document http://kafka.apache.org/documentation.html. But I meet some problem when I try to use the new Consumer API. I've done the job with following steps:

1. Add a new dependency

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>

2. Add configurations

    Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "host:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");

3. Use KafkaConsumer API

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");

However, when I try to poll message from the broker, I got nothing but null:

Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
    process(records);
else
    System.err.println("null");

And then I know what's wrong with the consumer after I checked the source code:

@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
    // TODO Auto-generated method stub
    return null;
}

To make matters worse, I cannot find any other useful information about the 0.8.2 API, since all usages about Kafka are not compatible with the latest version. Could anybody help me? Thanks a lot.

2

There are 2 answers

1
RobMcZag On

I am also trying to write a Consumer on top of Kafka 0.8.2.1 to read the messages produced by the new Producer.

So far what I have got is that the Producer API is ready and usable, while on the consumer side we have to wait 0.8.3, as @habsq noted and you already find out that there is some code included for the Consumer, but it is still not functional.

So the client to use (the current Client API) are the one found in the "core" project of your current Kafka version, i.e. 0.8.2.1 (better not downgrade the client to any other version).

So for now we need to import two jars: one for the "new" java clients and one for the core project, depending also on the scala version you are using (I use 2.11).

In my case I use graddle to manage dependencies so I just need to import

dependencies {
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1'
  compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1'
}

When you update dependencies it will get all the needed libraries.

If you are using a different Scala version just change the version; anyway you can find all the different version or the full pom on maven central: http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22

If you use those Consumer implementation all the current examples should work as usual.

PS ref: Kafka-users ml thread http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

0
Stefano Scarpanti On

Yes I can confirm that release 0.8.2.1 had problems in consuming messages. Now making a simple consumer with Java / Groovy and release 0.10.1.0, everything is working perfectly.

There is no more need for setting PARTITION_ASSIGNMENT_STRATEGY.