Make a Fast Api app that using chromadb vector database more faster

67 views Asked by At

I have an Fast Api app, when I get any new connection, my app started to download chromadb vector database files, what make the app response slowly, I want that my app will download them only one time when the server is started something like preprocess and not in every connection. this is my main code

`import logging import os from fastapi import FastAPI, UploadFile, Form, HTTPException from fastapi.responses import JSONResponse from MedicalRecord import MedicalRecord from VoiceParser import VoiceParser from chat_gpt_service import ChatGPTService from Validator import Validator import re from db_service import MongoDBHandler import uuid from VectorDB import VectorDB

newline_pattern = re.compile(r'\n')
app = FastAPI()
voice_parser = VoiceParser()
chat_gpt = ChatGPTService()
database = MongoDBHandler("SoundHealthDB")
validator = Validator()
CHROMA_DATA_PATH = "chroma_data/"
COLLECTION_NAME = "conversation_collection"
model_name = "all-MiniLM-L6-v2"
vector_db = VectorDB(persist_directory=CHROMA_DATA_PATH, model_name=model_name, collection_name=COLLECTION_NAME)
@app.post("/insert_new_doctor")
async def insert_new_doctor(user_key: str = Form(None) ,doctor_id:str =Form(None)):    
    message = validator.check_user_key_and_doctor_id(doctor_id, user_key)
    if (message != "valid"):
        return JSONResponse(content={"error": message}, status_code=400) 
                  
    if database.user_exists(user_key, doctor_id):
            return JSONResponse(content={"error": "the user is already exist. Try again"}, status_code=400)
    
    database.insert_doctor_data(user_key, doctor_id)
    return JSONResponse(content={"message": "new doctor user created successfully"})
               
     
@app.post("/upload_voice_data")
async def upload_voice_data(file: UploadFile = Form(None) ,user_key: str = Form(None) ,doctor_id:str =Form(None), patient_id:str =Form(None), topic:str =Form(None)):   
    try:        
        message = validator.check_upload_voice_data(file, user_key, doctor_id, patient_id, topic)
        if (message != "valid"):
            return JSONResponse(content={"error": message}, status_code=400) 
        
        if not database.user_exists(user_key, doctor_id):
            return JSONResponse(content={"error": "the user is not exist"}, status_code=400)
        
        # Save the audio file locally
        try:
            upload_folder = "./"
            fileName = "temp_" + file.filename
            audio_path = os.path.join(upload_folder, fileName)            
            with open(audio_path, "wb") as f:
                f.write(await file.read())
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Error saving file: {e}")         

        transcription = voice_parser.transcribe_wav_data(audio_path)           
                                 
        chat_analysis = chat_gpt.communicate_with_chatgpt(transcription)
        chat_analysis = newline_pattern.sub(' ', chat_analysis)
        generated_id = uuid.uuid4()
        conversation_id = str(generated_id)
        medical_record = MedicalRecord(user_key, doctor_id, patient_id, topic, conversation_id, transcription, chat_analysis)
        database.insert_user_data(medical_record)
        vector_db.insert_document(conversation_id, topic + chat_analysis)

        # Delete the temporary audio file
        try:
            os.remove(audio_path)
        except OSError as e:
            print(f"Error deleting temporary file: {e}")

        return JSONResponse(content={"Treatment recommendations: ": chat_analysis, "conversation id:": conversation_id})

    except HTTPException as http_exception:
        return JSONResponse(content={"error": f"HTTPException: {str(http_exception)}"}, status_code=http_exception.status_code)
    except Exception as general_exception:
        return JSONResponse(content={"error": f"Unexpected error: {str(general_exception)}"}, status_code=500)

`

this is my VectorDB code

import chromadb
from chromadb.utils import embedding_functions


class VectorDB:
    def __init__(self, persist_directory, model_name, collection_name):
        self.client = chromadb.PersistentClient(path=persist_directory)
        self.embedding_func = embedding_functions.SentenceTransformerEmbeddingFunction(
            model_name=model_name
        )
        self.get_or_create_collection(collection_name)

    def get_or_create_collection(self, collection_name):
        """
        Retrieves a collection if it exists, or creates it with the specified embedding function.

        Args:
            collection_name (str): The name of the collection to retrieve or create.

        Returns:
            chromadb.Collection: The retrieved or created collection object.
        """

        try:
            # Attempt to get the collection
            self.collection = self.client.get_collection(name=collection_name)
        except Exception as e:
            # If the collection doesn't exist, create it
            self.collection = self.client.create_collection(
                name=collection_name, embedding_function=self.embedding_func
            )

    def insert_document(self, conversation_id, text):
        """
        Inserts a new document with metadata into the ChromaDB collection.

        Args:

            conversation_id: Unique identifier for the conversation.

        Returns:
            None
            :param text:
            :param conversation_id:
        """
        self.collection.add(
            documents=[text],
            ids=[conversation_id],
            metadatas=[{"hnsw:space": "cosine"}],
        )

    def find_similar_conversions(self, text, n_results=3):
        """
        Finds similar vectors in the ChromaDB collection using a query vector.

        Args:
            n_results: Number of similar vectors to retrieve (default is 5).

        Returns:
            List of dictionaries containing similar vectors and their distances.
            :param n_results:
            :param text:
        """
        query_results = self.collection.query(
            query_texts=[text],
            include=["documents", "distances"],
            n_results=n_results
        )
        result_subset = {
            "Similar treatments": query_results.get("documents")[0][1:],
            "Most similar conversions ids": query_results.get("ids")[0][1:],
            "Conversions distances": query_results.get("distances")[0][1:]
        }

        return result_subset

    def get_text_by_conversation_id(self, conversation_id):
        """
        Retrieves the vector associated with a conversation_id from the ChromaDB collection.

        Args:
            conversation_id: The conversation_id for which to retrieve the vector.

        Returns:
            The vector associated with the conversation_id or -1 if not found.
        """
        try:
            result = self.collection.get(ids=conversation_id)
            return result["documents"][0]
        except (IndexError, KeyError):
            # Conversation_id not found in the collection or conversation_vector key not present
            return None

    def clear_conversation(self):
        ids_to_delete = []
        self.collection.delete(ids=ids_to_delete)

I tried to put this piece of code in other places in my code but it didn't worked, or maybe I put it in the wrong place. I want something that will make the download only 1 time like a preprocess for all the users or another way to make this part faster.

0

There are 0 answers