I am trying to retrieve some sets of documents from Cloud Firestore, each corresponding to a different query. In order to speed up this process, I am using Python's library concurrent.futures, so that those queries are performed concurrently. The thing is that I've come across a weird behaviour: if those queries are sent right after the app object is initialised, Firestore seems to get sort of stuck, and no response is returned ever (or at least in a reasonable time), as if the app wasn't fully initialised and couldn't handle a stack of queries after. My solution consist in performing a dummy query in between the initialisation and the concurrent queries, and my doubt is simple: is this behaviour expected or am I missing something?
This is the code that I'm using, where lines 15 to 17 sohuldn't be needed.
import concurrent.futures
from firebase_admin import firestore, initialize_app, credentials
print("Initializing app session with credentials")
credential_json = '/path/to/your/credentials.json'
firebase_credential = credentials.Certificate(credential_json)
app = initialize_app(name="your-app-name",
credential=firebase_credential,
options={"projectId": "your-project-id"})
db = firestore.client(app)
print("App initialized")
# This line is necessary to prevent concurrent processing from crashing but it shouldn't be needed to fully initialize the app
db.collection("a").document("b").get()
print("Dummy query")
# Function to fetch a single document
def fetch_document(doc_ref):
print(f"Fetching document {doc_ref.id}")
doc = doc_ref.get()
print(f"Fetched document {doc_ref.id}")
return doc.to_dict()
# Function to fetch multiple documents concurrently
def fetch_documents(document_refs):
print("Initializing thread pool")
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submitting tasks for fetching documents concurrently
print("Processing document list")
future_to_doc = {executor.submit(fetch_document, doc_ref): doc_ref for doc_ref in document_refs}
# Collecting results
results = {}
for future in concurrent.futures.as_completed(future_to_doc):
print("Document/job processed")
doc_ref = future_to_doc[future]
try:
print(f"Saving processing results for {doc_ref.id}")
results[doc_ref.id] = future.result()
except Exception as exc:
print(f"Error in document {doc_ref.id}: {exc}")
results[doc_ref.id] = str(exc) # Record any exception that occurred
print("Document list processing complete")
print("Closing thread pool")
return results
doc_ids = ["doc_id_1", "doc_id_2", "doc_id_3"]
# List of document references
refs = [db.collection('collection_name').document(doc_id) for doc_id in doc_ids]
# Printing results
for doc_id, data in fetch_documents(refs).items():
if isinstance(data, dict):
print(f"Document {doc_id}: {data}")
else:
print(f"Error fetching document {doc_id}: {data}")