How to force a consumer to read a specific partition in kafka

25.2k views Asked by At

I have an application for downloading specific web-content, from a stream of URL's generated from 1 Kafka-producer. I've created a topic with 5 partitions and there are 5 kafka-consumers. However the timeout for the webpage download is 60 seconds. While one of the url is getting downloaded, the server assumes that the message is lost and resends the data to different consumers.

I've tried everything mentioned in

Kafka consumer configuration / performance issues

and

https://github.com/spring-projects/spring-kafka/issues/202

But I keep getting different errors everytime.

Is it possible to tie a specific consumer with a partition in kafka? I am using kafka-python for my application

3

There are 3 answers

0
ashdnik On BEST ANSWER

I missed on the documentation of Kafka-python. We can use TopicPartition class to assign a specific consumer with one partition.

http://kafka-python.readthedocs.io/en/master/

>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
0
ppatierno On

I have never used the Python client but the Java one supports the assign method that you can use instead of the subscribe for asking to be assigned specific partitions for the topic. Of course, you lose the auto-rebalancing feature and you have to handle these use case manually.

1
GuangshengZuo On

Maybe I guess what really happens in your case. If your consumer fetch the url from kafka, and then go to download the content, and you said that it cost about 60s to do it. So your consumer block it because the downloading, and could not send heartbeat to kafka server. So kafka server think this consumer is down, so it do a group rebalance, and resend the uncommited message to other comsumers.

So there are two solutions you could try:

  1. set the configs session_timeout_ms to 60000 or bigger. the default is 30s, it is not enough for you.

  2. A better solution is using multithreading to do it. when your consumer fetch a message from kafka, and then start a new thread to download the content, it will not block the consumer.poll, so it can work well.