twisted python 2.7 integrate with confluent-kafka issue for a legacy system

62 views Asked by At

First of all, yes it's a legacy system which is not yet completely ported to Python 3 yet.

I have this piece of code where there is a Twisted program which serves some HTTP request, for simplicity I'm not doing anything in the GET request but assume that there is some other complex stuff going on.

I'm using confluent-kafka==2.2.0 and Twisted==13.2.0

from twisted.internet import reactor, threads
from twisted.web import server, resource
from twisted.internet.task import LoopingCall
from confluent_kafka import Consumer, KafkaException
import json

# Function to handle Kafka consumer
def kafka_consumer():
    def fetch_data():
        def poll_kafka():
            msg = consumer.poll(0.1)  # Poll without a timeout
            if msg is None:
                return
            if msg.error():
                if msg.error().code() == KafkaException._PARTITION_EOF:
                    return
                else:
                    return
            else:
                # Process fetched data from Kafka
                commit_offset()  # Manually commit the offset

        def commit_offset():
            consumer.commit()  # Manually commit the offset

        # Execute Kafka polling in a separate thread
        d1 = threads.deferToThread(poll_kafka)

    def start_loop():
        lc = LoopingCall(fetch_data)
        lc.start(0.5)  # Fetch data every 0.5 seconds

    conf = {
        'bootstrap.servers': 'kafka_internal-1:29093',
        'group.id': 'your_consumer_group-2',  # Replace with your consumer group
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False  # Disable autocommit
    }

    consumer = Consumer(conf)
    consumer.subscribe(['jithin_test'])  # Replace with your Kafka topic

    start_loop()

# Web service handler
class WebService(resource.Resource):
    isLeaf = True

    def render_GET(self, request):
        # You can customize the response according to your needs
        response = {
            'message': 'Welcome to the Kafka consumer service!'
        }
        return json.dumps(response).encode('utf-8')


if __name__ == '__main__':
    # Start the Kafka consumer in a separate thread
    reactor.callWhenRunning(kafka_consumer)

    # Run the Twisted web service
    root = WebService()
    site = server.Site(root)
    reactor.listenTCP(8180, site)  # Replace 8080 with your desired port
    reactor.run()

My initial goal is to poll every 0.5 seconds to Kafka and get the data and manipulate something with it. I assume under heavy load (say we have multiple endpoints and maybe I'm doing a stress test by bombarding the endpoints with a lot of requests) the 0.5 seconds looping call won't be guaranteed.

Is there a way I can achieve this?

One option I was thinking of would use a dedicated Python thread for this and use a Queue.queue to pass data between the new Kafka thread and Reactor thread. Would it be a safe option to use it this way?

I know Kafka uses poll mechanism not like you push to one end and after you subscribe to it by attaching a callback say on_message you will receive messages in that method/function.

Is there a better way to handle this?

0

There are 0 answers