Best Practices for Using Kafka in FastAPI Service for Periodic OTA API Calls

155 views Asked by At

I am developing a FastAPI service in Python to fetch user reviews from various OTA APIs (e.g., Booking.com, MakeMyTrip.com) for my clients' hotels. The goal is to retrieve all reviews for each hotel from different OTAs every 15 minutes. To implement this, I am considering using Kafka because my manager asked to do so.

The current approach suggested involves creating both a Kafka producer and consumer within the same service. The idea is to produce a message containing parameters required to hit the OTA API, then consume and process these messages on the same server to make the API calls.

As a newcomer to Kafka, I'm seeking advice on whether this is a sound decision. Are there specific advantages to having the producer and consumer within the same service, or would it be more beneficial to separate them into distinct services? I want to ensure the most efficient and scalable implementation for my OTA API integration.

Any insights or best practices regarding the architecture of Kafka producers and consumers in a FastAPI service for periodic tasks like these would be greatly appreciated. Thank you!

class KafkaProducerInterface(ABC):
    def produce_message(self, kafka_message: KafkaMessage):pass


class KafkaBaseProducer(KafkaProducerInterface):
    def __init__(self):
        producer_config = {
            "bootstrap_servers": "your_kafka_bootstrap_servers",
            # Additional configuration options if needed
        }
        self.producer = KafkaProducer(**producer_config,value_serializer=lambda v: str.encode(str(v)))

    def produce_message(self,kafka_message:KafkaMessage):pass

class KafkaConsumerInterface:
    def consume_messages(self): pass


class KafkaBaseConsumer(KafkaConsumerInterface):
    def __init__(self,topic:str):
        self.consumer:KafkaConsumer = KafkaConsumer(
            topic,
            bootstrap_servers='your_kafka_bootstrap_servers',
            group_id='your_consumer_group_id',
            auto_offset_reset='earliest',  # adjust as needed
            enable_auto_commit=True,
            value_deserializer=lambda x: x.decode('utf-8')
        )

    def consume_messages(self):pass

class SendReviewsKafkaConsumer(KafkaBaseConsumer):
    def __init__(self, topic: str):
        super().__init__(topic)
        BackgroundTasks.add_task(self.consume_messages)

    async def consume_messages(self):
        while True:
            for message in self.consumer:
                message_string = message.value
                message_data_instance: MessageData = parse_obj_as(MessageData, eval(message_string))

class LoadReviewsKafkaProducer(KafkaBaseProducer):
    def __init__(self,topic:str):
        self.__topic = topic

    def produce_message(self,kafka_message: KafkaMessage):
        self.producer.send(self.__topic, key=kafka_message.key, value=kafka_message.value)
        return {"status": "Message produced successfully"}

class KafkaMessage(BaseModel):
    key: str
    value: str


class MessageData(BaseModel):
    ota_type:str
    platform_property_id:str
    entity_id:str

0

There are 0 answers