- asyncpg version:0.29.0
- PostgreSQL version: "PostgreSQL 16.0 (Debian 16.0-1.pgdg120+1) on aarch64-unknown-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit"
- Do you use a PostgreSQL SaaS? If so, which? Can you reproduce the issue with a local PostgreSQL install?: Nope, using Dockerized container
- Python version: 3.11.5
- Platform: MacBook-Air Darwin Kernel Version 21.1.0: Wed Oct 13 17:33:24 PDT 2021; root:xnu-8019.41.5~1/RELEASE_ARM64_T8101 arm64
- Do you use pgbouncer?: Nope
- Did you install asyncpg with pip?: Yes
- If you built asyncpg locally, which version of Cython did you use?: n/a
- Can the issue be reproduced under both asyncio and uvloop?:Nope
I have a singleton for a Database entity, which is then used to write some primitive data. When I run the write function 1000 times concurrently using asyncio.gather()
, the database reports that there is more connections than the max_size
of the asyncpg.pool
. For example, when testing there was 857 active db connections, but only 62 active pool connections. No other clients/operations were running during the test. When I use uvloop to do the same thing, it just crashes with ConnectionResetError: [Errno 54] Connection reset by peer
if I try to run more tasks than the size of the pool.
Is this a normal pool behavior?
I use code below (the write function is simplified though):
The database code:
class Database:
_instance = None
_pool = None
db_params = {
'host': os.getenv('DATABASE_HOST'),
'port': os.getenv('DATABASE_PORT'),
'database': os.getenv('DATABASE_NAME'),
'user': os.getenv('DATABASE_USER'),
'password': os.getenv('DATABASE_PASSWORD')
}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(Database, cls).__new__(cls)
return cls._instance
@classmethod
async def get_pool(cls):
if cls._pool is None:
cls._pool = await asyncpg.create_pool(**cls.db_params, min_size=1, max_size=150)
return cls._pool
@classmethod
async def write(cls, result):
pool = await cls.get_pool()
try:
async with pool.acquire() as connection:
result = await connection.execute('''
INSERT INTO tables.results(
result
) VALUES($1)
''', result)
return
except Exception as e:
raise e
The demo write code
async def fake_result(i):
print(f'generating fake result {i}')
await db.record_result(i)
return
async def run_functions_concurrently():
tasks = [fake_result(i) for i in range(1000)]
await asyncio.gather(*tasks)
def main():
asyncio.run(run_functions_concurrently())
if __name__ == "__main__":
main()
When
asyncio.gather
is invoked for a list of tasks, the tasks run concurrently, and a new connection pool is created for each task. You'll notice that the text "This connection pool is never reused" is printed for each task run when we add a print statement in theget_pool
method to debug this.We can ensure pool reuse by creating the pool before scheduling the tasks.
Now, we encounter another issue. Each task run establishes a connection to the database to perform an insert. This process isn't very performant, and we have a couple of options for orchestrating how we write results:
Collect the tasks in batches and then execute them.
Enqueue each task result and run a loop to process results from the queue, subsequently writing them to the database.
Lets take a look at the batching implementation,
We can enhance this using the latter approach, which involves utilizing workers and a queue. This has the benefit of helping us avoid an excessive number of connection pools.