Lets consider this piece of code
from twisted.web import server, resource
from twisted.internet.task import LoopingCall
from confluent_kafka import Consumer, KafkaException
import json
# Function to handle Kafka consumer
def kafka_consumer():
def fetch_data():
def poll_kafka():
msg = consumer.poll(0.1)
if msg is None:
return
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
return
else:
return
else:
print("message", msg, msg.value())
consumer.commit() # Manually commit the offset
# Execute Kafka polling in a separate thread
d1 = threads.deferToThread(poll_kafka)
def start_loop():
lc = LoopingCall(fetch_data)
lc.start(0.5)
conf = {
'bootstrap.servers': 'kafka_internal-1:29093',
'group.id': 'your_consumer_group-2',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Disable autocommit
}
consumer = Consumer(conf)
consumer.subscribe(['jithin_test']) # <-- is it a blocking call??
start_loop()
# Web service handler
class WebService(resource.Resource):
isLeaf = True
def render_GET(self, request):
# You can customize the response according to your needs
response = {
'message': 'Welcome to the Kafka consumer service!'
}
return json.dumps(response).encode('utf-8')
if __name__ == '__main__':
reactor.callWhenRunning(kafka_consumer)
# Run the Twisted web service
root = WebService()
site = server.Site(root)
reactor.listenTCP(8180, site)
reactor.run()
Here I'm instantiating Consumer
object from confluent-kafka
in the reactor thread,
then leave the subsequent poll()
to deferToThread()
,
I've few question regarding this,
Consumer.subscribe()
is it a blocking call? should I bedeferTothread
this method when invoking?- will it corrupt the consumer where there is a consumer re-balancing happens if I'm firing another call to
poll_kafka
usingdeferToThread
(as per my understanding, every time the thread that we run usingdeferToThread
would be from the threadpool and there is no guarantee that we will be using the same thread)? - if so is there a way to manage this? maybe running the entire stuff in a separate python thread and pass the consumed value back to the twisted application?
- or is there a way I can re-use the consumer object without corrupting the consumer?
Nb: the code is written in python2
it's an integration of some legacy system porting the whole stuff is not possible atm and most of the other libraries are available only support python 3+.
If you want to defer things to a thread that is not the reactor thread, I would recommend using https://docs.twistedmatrix.com/en/stable/api/twisted.internet.threads.html#deferToThreadPool with a custom https://docs.twistedmatrix.com/en/stable/api/twisted.python.threadpool.ThreadPool.html that you manage yourself, with
minthreads=1, maxthreads=1
.You do have to do a bit of annoying lifecycle management of this thread pool, but in this simple example that would just be
.start()
inkafka_consumer
and.stop()
in something added toreactor.addSystemEventTrigger("before", "shutdown", ...)
.