First of all, yes it's a legacy system which is not yet completely ported to Python 3 yet.
I have this piece of code where there is a Twisted program which serves some HTTP request, for simplicity I'm not doing anything in the GET
request but assume that there is some other complex stuff going on.
I'm using confluent-kafka==2.2.0 and Twisted==13.2.0
from twisted.internet import reactor, threads
from twisted.web import server, resource
from twisted.internet.task import LoopingCall
from confluent_kafka import Consumer, KafkaException
import json
# Function to handle Kafka consumer
def kafka_consumer():
def fetch_data():
def poll_kafka():
msg = consumer.poll(0.1) # Poll without a timeout
if msg is None:
return
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
return
else:
return
else:
# Process fetched data from Kafka
commit_offset() # Manually commit the offset
def commit_offset():
consumer.commit() # Manually commit the offset
# Execute Kafka polling in a separate thread
d1 = threads.deferToThread(poll_kafka)
def start_loop():
lc = LoopingCall(fetch_data)
lc.start(0.5) # Fetch data every 0.5 seconds
conf = {
'bootstrap.servers': 'kafka_internal-1:29093',
'group.id': 'your_consumer_group-2', # Replace with your consumer group
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Disable autocommit
}
consumer = Consumer(conf)
consumer.subscribe(['jithin_test']) # Replace with your Kafka topic
start_loop()
# Web service handler
class WebService(resource.Resource):
isLeaf = True
def render_GET(self, request):
# You can customize the response according to your needs
response = {
'message': 'Welcome to the Kafka consumer service!'
}
return json.dumps(response).encode('utf-8')
if __name__ == '__main__':
# Start the Kafka consumer in a separate thread
reactor.callWhenRunning(kafka_consumer)
# Run the Twisted web service
root = WebService()
site = server.Site(root)
reactor.listenTCP(8180, site) # Replace 8080 with your desired port
reactor.run()
My initial goal is to poll every 0.5 seconds to Kafka and get the data and manipulate something with it. I assume under heavy load (say we have multiple endpoints and maybe I'm doing a stress test by bombarding the endpoints with a lot of requests) the 0.5 seconds looping call won't be guaranteed.
Is there a way I can achieve this?
One option I was thinking of would use a dedicated Python thread for this and use a Queue.queue
to pass data between the new Kafka thread and Reactor thread.
Would it be a safe option to use it this way?
I know Kafka uses poll mechanism not like you push to one end and after you subscribe to it by attaching a callback say on_message
you will receive messages in that method/function.
Is there a better way to handle this?