Asyncpg.pool creates more connections than its max_size

177 views Asked by At
  • asyncpg version:0.29.0
  • PostgreSQL version: "PostgreSQL 16.0 (Debian 16.0-1.pgdg120+1) on aarch64-unknown-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit"
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce the issue with a local PostgreSQL install?: Nope, using Dockerized container
  • Python version: 3.11.5
  • Platform: MacBook-Air Darwin Kernel Version 21.1.0: Wed Oct 13 17:33:24 PDT 2021; root:xnu-8019.41.5~1/RELEASE_ARM64_T8101 arm64
  • Do you use pgbouncer?: Nope
  • Did you install asyncpg with pip?: Yes
  • If you built asyncpg locally, which version of Cython did you use?: n/a
  • Can the issue be reproduced under both asyncio and uvloop?:Nope

I have a singleton for a Database entity, which is then used to write some primitive data. When I run the write function 1000 times concurrently using asyncio.gather(), the database reports that there is more connections than the max_size of the asyncpg.pool. For example, when testing there was 857 active db connections, but only 62 active pool connections. No other clients/operations were running during the test. When I use uvloop to do the same thing, it just crashes with ConnectionResetError: [Errno 54] Connection reset by peer if I try to run more tasks than the size of the pool.

Is this a normal pool behavior?

I use code below (the write function is simplified though):

The database code:

  class Database:
      _instance = None
      _pool = None
      db_params = { 
                  'host': os.getenv('DATABASE_HOST'),
                  'port': os.getenv('DATABASE_PORT'),
                  'database': os.getenv('DATABASE_NAME'),
                  'user': os.getenv('DATABASE_USER'),
                  'password': os.getenv('DATABASE_PASSWORD')
              }
  
      def __new__(cls, *args, **kwargs):
          if cls._instance is None:
              cls._instance = super(Database, cls).__new__(cls)
          return cls._instance
  
      @classmethod
      async def get_pool(cls):
          if cls._pool is None:
              cls._pool = await asyncpg.create_pool(**cls.db_params, min_size=1, max_size=150)
          return cls._pool

    @classmethod
    async def write(cls, result):
            pool = await cls.get_pool()
            try:
                    async with pool.acquire() as connection:
                        result = await connection.execute('''
                            INSERT INTO tables.results(
                                result
                            ) VALUES($1)
                        ''', result)
                        return
            except Exception as e:
                raise e

The demo write code

async def fake_result(i):
    print(f'generating fake result {i}')
    await db.record_result(i)
    return

async def run_functions_concurrently():
   tasks = [fake_result(i) for i in range(1000)]
   await asyncio.gather(*tasks)

def main():
    asyncio.run(run_functions_concurrently())

if __name__ == "__main__":
    main()
1

There are 1 answers

5
Oluwafemi Sule On

When asyncio.gather is invoked for a list of tasks, the tasks run concurrently, and a new connection pool is created for each task. You'll notice that the text "This connection pool is never reused" is printed for each task run when we add a print statement in the get_pool method to debug this.

      @classmethod
      async def get_pool(cls):
          if cls._pool is None:
              print("This connection pool is never reused.")
              cls._pool = await asyncpg.create_pool(
                  **cls.db_params, min_size=1, max_size=150)
          return cls._pool

We can ensure pool reuse by creating the pool before scheduling the tasks.

async def run_functions_concurrently():
   await db.get_pool()
   tasks = [fake_result(i) for i in range(1000)]
   await asyncio.gather(*tasks)

Now, we encounter another issue. Each task run establishes a connection to the database to perform an insert. This process isn't very performant, and we have a couple of options for orchestrating how we write results:

  1. Collect the tasks in batches and then execute them.

  2. Enqueue each task result and run a loop to process results from the queue, subsequently writing them to the database.

Lets take a look at the batching implementation,

async def run_batches(job, batch_size):
    await db.get_pool()

    # Split the list of jobs into batches
    batches = [jobs[i:i + batch_size] for i in range(0, len(jobs), batch_size)]

    # Run batches concurrently
    for batch in batches:
        await run_functions_concurrently(batch)

def main():
    batch_size = 3
    jobs = [fake_result(i) for i in range(1000)]
    asyncio.run(
        run_batches(
            jobs, 
            batch_size)
    )

We can enhance this using the latter approach, which involves utilizing workers and a queue. This has the benefit of helping us avoid an excessive number of connection pools.

async def worker(queue):
    while True:
        job = await queue.get()
        if job is None:
            break
        # Process job from queue
        await job
        queue.task_done()


async def run_workers(jobs, num_workers):
    queue = asyncio.Queue()
    # Number of connection pools created is the value of num_workers
    workers = [asyncio.create_task(worker(queue)) for _ in range(num_workers)]

    for job in jobs:
        await queue.put(job)

    # Send signal to quit workers
    for _ in range(num_workers):
        await queue.put(None)

    # Process all enqueued tasks
    await queue.join()

    # Cancel worker tasks
    for worker_task in workers:
        worker_task.cancel()

    # Wait for worker tasks to complete
    await asyncio.gather(*workers, return_exceptions=True)


def main():
    jobs = [fake_result(i) for i in range(1000)]
    num_workers = 3 
    asyncio.run(run_workers(jobs, num_workers))