Dump the kafka (kafka-python) to a txt file

4.3k views Asked by At

I need to dump the output of the kafka consumer into an excel file periodically. I use the following code:

from kafka import KafkaConsumer
from kafka import KafkaProducer
import json,time
from xlutils.copy import copy    
from xlrd import open_workbook
import pandas

consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
KafkaConsumer()
consumer.subscribe("test")

rowx=0
colx=0

for msg in consumer:
        book_ro = open_workbook("twitter.xls")
        book = copy(book_ro)  # creates a writeable copy
        sheet1 = book.get_sheet(0)  # get a first sheet
        sheet1.write(rowx,colx, msg[6])
        book.save("twitter.xls")

Now, my issue is that the code is not efficient. for each message I need to open, write, and then save the excel file. Is there any approach to open the excel once, write, and then close it (for a batch of messages and not in the for loop)? tnx

1

There are 1 answers

2
GuangshengZuo On BEST ANSWER

Yes, open,write,save and close with each message is inefficient, you could do it in a batch. But still need do it in consuming loop.

msg_buffer = []
buffer_size = 100
for msg in consumer:
        msg_buffer.append(msg[6])
        if len(msg_buffer) >= buffer_size:
            book_ro = open_workbook("twitter.xls")
            book = copy(book_ro)  # creates a writeable copy
            for _msg in msg_buffer:
                sheet1 = book.get_sheet(0)  # get a first sheet
                sheet1.write(rowx,colx, _msg)
            book.save("twitter.xls")
            msg_buffer = []

You could think that will be 100 times faster than nobatch.

UPDATE for comment:

Yes, Usually we will stay in this loop forever, it internally uses poll to fetch the new message, send heartbeat and commit offset. And if your aim is consuming message from this topic and save message, it should be a long running loop.

This is kafka-python design, you should use like this to consume message or use the consumer.poll().

As for why you could use for msg in consumer:, Because the consumer is an iterator object, its class implements the __iter__ and __next__, it underlying uses a fetcher to fetch records. more implementation details you could refer https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/group.py