Exception handling and Retry failure messages with same offset from Kafka using Python

49 views Asked by At

Exception handling and Retry failure messages with same offset from Kafka using Python.

The objective is to design exception handling and retry mechanisms based on specific scenarios encountered during message processing. These scenarios involve dealing with two main exceptions: Connection Exception and Validation Exception.

  1. Connection Exception Handling:

    • If the payload message is identified as '5', a connection exception should be raised.
    • Upon encountering the connection exception, the Lambda function should retry processing the same message.means it will not acknowledge or commit that message where exception occcure .
    • The function should ensure that when a connection exception occurs, it picks up the same message again from the same offset in the Kafka topic. and retry till its get commit.
  2. Validation Exception Handling:

    • When the payload message is '10', a validation exception should be raised.
    • However, encountering the validation exception means that the Lambda function should skip retrying the current message.
    • Instead, it should proceed to the next message available in the Kafka topic, potentially at a different offset.

The ultimate goal is to ensure that the Lambda function handles these exceptions appropriately, retried only when necessary, and skips messages that encounter validation exceptions, maintaining efficient message processing from Kafka topics.


from confluent_kafka import Producer, KafkaError , Consumer
from fastapi import FastAPI
import threading
import boto3
import os
import configparser
from cache import get_or_refresh_token

app = FastAPI()


import time 
import json

class ConnectionException(Exception):
    pass

class ValidationException(Exception):
    pass

MAX_RETRIES = -1
# RETRY_INTERVAL = 5  


consumer = Consumer({
    'bootstrap.servers': KAFKA_BROKER,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-512',
    'sasl.username': username,
    'sasl.password': password , 
    'group.id': '',
    'auto.offset.reset': 'latest',
    'enable.auto.commit' : False
})

consumer.subscribe([KAFKA_TOPIC])

def recreate_kafka_consumer():
    global consumer
    
    consumer.close()
    consumer = Consumer({
        'bootstrap.servers': KAFKA_BROKER,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'SCRAM-SHA-512',
        'sasl.username': username,
        'sasl.password': password , 
        'group.id': ,
        'auto.offset.reset': 'latest',
        'enable.auto.commit' : False
    })
    consumer.subscribe([KAFKA_TOPIC])
    print("Created New Consumber connection -----------")


def consume_messages():
    print("PerformanceTest: In the consume_messages function")
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"PerformanceTest: Error while consuming message: {msg.error()}")
                
                try:
                    process_message(msg.value())
                except ConnectionException as e:
                    print(f"ConnectionException occurred: {str(e)}")
                    while True:
                        print(f"Retrying message with offset {msg.offset()}")
                        recreate_kafka_consumer()
                        consumer.seek(msg.partition(), msg.offset())
                        msg = consumer.poll(1.0)
                        if msg is not None and not msg.error():
                            print("Retry successful.")
                            break
                        print("Retry failed. Waiting for new messages...")
                        time.sleep(RETRY_INTERVAL)
                    continue  # Continue to the next iteration to avoid processing the message after retry

        print(f'Demo: Message consumed from the topic: {msg.topic()}')
        print(f'Demo: Message consumed from the partition: {msg.partition()}')
        print(f'Demo: Message consumed from the offset: {msg.offset()}')
        
        try:
            process_message(msg.value())
        except ConnectionException as e:
            print(str(e))
            # Retry logic for ConnectionException
            recreate_kafka_consumer()
        except ValidationException as e:
            print(str(e))
            continue

        
def process_message(message):
    payload_json = json.loads(message)
    print('Demo: JSON format of the payload from Kafka is ', payload_json)

    payload_message = payload_json.get("message")

    try:
        if payload_message == '5':
            print
            raise ConnectionException(f"Connection Exception occurred while payload_message =     {payload_message}")
        elif payload_message == '10':
            raise ValidationException(f"ValidationException occurred, Invalid message detected while payload_message = {payload_message}")
        print("Demo: JSON message of the payload from Kafka is : ", payload_message)
    except Exception as e:
        print(f"Error processing message: {str(e)}")
    
    
@app.on_event("startup")
async def startup_event():
    print("PerformanceTest : successfully created thread")
    consumer_thread = threading.Thread(target=consume_messages)
    consumer_thread.daemon = True
    consumer_thread.start()

0

There are 0 answers