I am developing a FastAPI service in Python to fetch user reviews from various OTA APIs (e.g., Booking.com, MakeMyTrip.com) for my clients' hotels. The goal is to retrieve all reviews for each hotel from different OTAs every 15 minutes. To implement this, I am considering using Kafka because my manager asked to do so.
The current approach suggested involves creating both a Kafka producer and consumer within the same service. The idea is to produce a message containing parameters required to hit the OTA API, then consume and process these messages on the same server to make the API calls.
As a newcomer to Kafka, I'm seeking advice on whether this is a sound decision. Are there specific advantages to having the producer and consumer within the same service, or would it be more beneficial to separate them into distinct services? I want to ensure the most efficient and scalable implementation for my OTA API integration.
Any insights or best practices regarding the architecture of Kafka producers and consumers in a FastAPI service for periodic tasks like these would be greatly appreciated. Thank you!
class KafkaProducerInterface(ABC):
def produce_message(self, kafka_message: KafkaMessage):pass
class KafkaBaseProducer(KafkaProducerInterface):
def __init__(self):
producer_config = {
"bootstrap_servers": "your_kafka_bootstrap_servers",
# Additional configuration options if needed
}
self.producer = KafkaProducer(**producer_config,value_serializer=lambda v: str.encode(str(v)))
def produce_message(self,kafka_message:KafkaMessage):pass
class KafkaConsumerInterface:
def consume_messages(self): pass
class KafkaBaseConsumer(KafkaConsumerInterface):
def __init__(self,topic:str):
self.consumer:KafkaConsumer = KafkaConsumer(
topic,
bootstrap_servers='your_kafka_bootstrap_servers',
group_id='your_consumer_group_id',
auto_offset_reset='earliest', # adjust as needed
enable_auto_commit=True,
value_deserializer=lambda x: x.decode('utf-8')
)
def consume_messages(self):pass
class SendReviewsKafkaConsumer(KafkaBaseConsumer):
def __init__(self, topic: str):
super().__init__(topic)
BackgroundTasks.add_task(self.consume_messages)
async def consume_messages(self):
while True:
for message in self.consumer:
message_string = message.value
message_data_instance: MessageData = parse_obj_as(MessageData, eval(message_string))
class LoadReviewsKafkaProducer(KafkaBaseProducer):
def __init__(self,topic:str):
self.__topic = topic
def produce_message(self,kafka_message: KafkaMessage):
self.producer.send(self.__topic, key=kafka_message.key, value=kafka_message.value)
return {"status": "Message produced successfully"}
class KafkaMessage(BaseModel):
key: str
value: str
class MessageData(BaseModel):
ota_type:str
platform_property_id:str
entity_id:str