InvalidRequestError during concurrent requests with FastAPI, aiosqlite, and SQLAlchemy

83 views Asked by At

I am encountering an issue with a FastAPI application using aiosqlite and SQLAlchemy when subjected to a high number of concurrent requests, specifically during a penetration test with 100 parallel queries using Locust. The application works well with a few parallel requests, but approximately 20% of the queries fail under heavy load.

import uuid
import asyncio
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncEngine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import SingletonThreadPool
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

Base = declarative_base()

class testdb(Base):
    __tablename__ = "testdb"
    id = Column(Integer, primary_key=True, index=True, nullable=False)
    data = Column(String, index=True, nullable=False)


async_engine:AsyncEngine = create_async_engine("sqlite+aiosqlite:///./test.db", echo=True, future=True, poolclass=SingletonThreadPool)


async def getsession() -> AsyncSession:
    async_session = sessionmaker(bind=async_engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as newsession:
        yield newsession

app = FastAPI()
lock = asyncio.Lock()

@app.post('/save')
async def save(session:AsyncSession=Depends(getsession)):
    new = testdb(data=uuid.uuid4().hex)
    session.add(new)
    await session.commit()
    await session.refresh(new)
    return new

@app.on_event('startup')
async def onstartup():
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

Details:

  • The issue occurs during the execution of the /save endpoint, specifically when calling await session.refresh(new) after committing the session.

  • I am using an AsyncSession and an AsyncEngine with a SingletonThreadPool for database connections.

The error traceback I receive is as follows:

ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 419, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/routing.py", line 758, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/routing.py", line 778, in app
    await route.handle(scope, receive, send)
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/routing.py", line 299, in handle
    await self.app(scope, receive, send)
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/routing.py", line 79, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/starlette/routing.py", line 74, in app
    response = await func(request)
               ^^^^^^^^^^^^^^^^^^^
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/fastapi/routing.py", line 299, in app
    raise e
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/fastapi/routing.py", line 294, in app
    raw_response = await run_endpoint_function(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/fastapi/routing.py", line 191, in run_endpoint_function
    return await dependant.call(**values)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/xxxx/testdb.nosync/test_sqlalchemy.py", line 34, in save
    await session.refresh(new)
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 327, in refresh
    await greenlet_spawn(
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 202, in greenlet_spawn
    result = context.switch(value)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/xxxx/testdb.nosync/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3140, in refresh
    raise sa_exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Could not refresh instance '<testdb at 0x108ed9410>'

How to fix this behavior?

I conducted a penetration test on the /save endpoint using Locust with 100 parallel user sessions and anticipated a seamless execution without any tracebacks. The module versions used for the test are as follows:

- uvicorn[standard]==0.27.0

- fastapi==0.109.2

- aiosqlite==0.19.0

- sqlalchemy[asyncio]==2.0.25

Python Version 3.11

Despite these efforts, I encountered the same error consistently. I also experimented with a version utilizing SQLModel, as well as various combinations of older versions of FastAPI and SQLAlchemy, all resulting in the persistence of the issue.

I tested with Linux Ubuntu 22.04. and Mac OS on a MacBook Air M2.

1

There are 1 answers

0
Andreas On

By including connect_args={'timeout': 120} in the create_async_engine() function, the database successfully manages the penetration test without encountering any errors. No RequestError or locked Database Error occurs.

On a MacBook Air M2, performing simultaneous writes by 100 users to the database yields 500 Requests/s, as tested with Locust.

import uuid
import asyncio
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncEngine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

Base = declarative_base()

class testdb(Base):
    __tablename__ = "testdb"
    id = Column(Integer, primary_key=True, index=True, nullable=False)
    data = Column(String, index=True, nullable=False)


async_engine:AsyncEngine = create_async_engine("sqlite+aiosqlite:///./test.db", poolclass=QueuePool, max_overflow=-1 ,connect_args={'timeout': 120})


async def getsession() -> AsyncSession:
    async_session = sessionmaker(bind=async_engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as newsession:
        yield newsession

app = FastAPI()

@app.post('/save')
async def save(session:AsyncSession=Depends(getsession)):
    new = testdb(data=uuid.uuid4().hex)
    session.add(new)
    await session.commit()
    await session.refresh(new)
    return new

@app.on_event('startup')
async def onstartup():
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)