How to avoid error 429 (Too Many Requests) python with Asyncio

I am using the following code to make requests with aiohttp client. The server that I am trying to send request has a 30k request limit per hour per IP. So I am getting 429 too many request error. I want to put the job on sleep whenever it hits the limit.

I can extract the x_rateLimit_reset from the header so I thought I could use it to put the job on sleep but I observed very strange behavior. Sometimes the job the sleep time becomes negative and sometimes it gets stuck in sleeping mode.

For example, the last time that I ran the job, it first slept for 2000 seconds and then after the time passed, it again tried to sleep for another 2500 seconds and got stuck in sleeping mode. I think maybe the other parallel processes caused the issue so was wondering how to deal with too many request error msg when using Asyncio.

@backoff.on_exception(backoff.expo, (asyncio.TimeoutError, aiohttp.client_exceptions.ServerDisconnectedError,TooManyRequests),
    async def fetch(self, url, session, params):
            async with session.get(url, params=params) as response:
                now = int(time.time())
                output = await
                output = json.loads(output)

                if 'X-RateLimit-Remaining' in response.headers:
                    rate = response.headers['X-RateLimit-Remaining']

                if 'status' in output and output['status'] == 429:
                    x_rateLimit_reset = int(response.headers['X-RateLimit-Reset'])
                    print("sleep mode")
                    seconds = x_rateLimit_reset - now
          "The job will sleep for {} seconds".format(seconds))
                    raise TooManyRequests()

            return output

        except (asyncio.TimeoutError, TypeError, json.decoder.JSONDecodeError,
                aiohttp.client_exceptions.ServerDisconnectedError) as e:

    async def bound_fetch(self, sem, url, session, params):
        # Getter function with semaphore.
        async with sem:
            output = await self.fetch(url, session, params)
        return {"url": url, "output": output}

Edited: This is how I initiate bound_fetch and define the URLs:

def get_responses(self, urls, office_token, params=None):   
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(, urls, params))
    responses = loop.run_until_complete(future)
    return responses

async def run(self, office_token, urls, params):
        tasks = []
        # create instance of Semaphore
        sem = asyncio.BoundedSemaphore(200)
        timeout = ClientTimeout(total=1000)

        async with ClientSession(auth=BasicAuth(office_token, password=' '), timeout=timeout,
                                 connector=TCPConnector(ssl=False)) as session:
            for url in urls:
                # pass Semaphore and session to every GET request
                task = asyncio.ensure_future(self.bound_fetch(sem, url, session, params))

            responses = await asyncio.gather(*tasks)
            return responses

urls = [
                        "{}/{}".format(self.base_url, "{}?page={}&api_key={}".format(object_name, page_number, self.api_keys))
                        for page_number in range(batch * chunk_size + 1, chunk_size * (1 + batch) + 1)]

valentinmk On BEST ANSWER

Main reason you are using time.sleep() instead await asyncio.sleep().


Here is minimal working solution and some comment how it works.

Please use it to adopt your solution.

Take a look on asyncio-throttle

import aiohttp
import asyncio
from datetime import datetime

async def fetch(session, task):  # fetching urls and mark result of execution
    async with session.get(task['url']) as response:
        if response.status != 200:
            # response.raise_for_status()
            # Here you need to somehow  handle 429 code if it acquired
            # In my example I just skip it.
            task['result'] = response.status
            task['status'] = 'done'
        await response.text()  # just to be sure we acquire data
        print(f"{str(}: Got result of {task['url']}")  # logging
        task['result'] = response.status
        task['status'] = 'done'

async def fetch_all(session, urls, persecond):
    # convert to list of dicts
    url_tasks = [{'url': i, 'result': None, 'status': 'new'} for i in urls]
    n = 0  # counter
    while True:
        # calc how many tasks are fetching right now
        running_tasks = len([i for i in url_tasks if i['status'] in ['fetch']])
        # calc how many tasks are still need to be executed
        is_tasks_to_wait = len([i for i in url_tasks if i['status'] != 'done'])
        # check we are not in the end of list n < len()
        # check we have room for one more task
        if n < len(url_tasks) and running_tasks < persecond:
            url_tasks[n]['status'] = 'fetch'
            # Here is main trick
            # If you schedule task inside running loop
            # it will start to execute sync code until find some await
            asyncio.create_task(fetch(session, url_tasks[n]))
            n += 1
            print(f'Schedule tasks {n}. '
                  f'Running {running_tasks} '
                  f'Remain {is_tasks_to_wait}')
        # Check persecond constrain and wait a sec (or period)
        if running_tasks >= persecond:
            await asyncio.sleep(1)
        # Here is another main trick
        # To keep (or loop.run_until_complete) executing
        # we need to wait a little than check that all tasks are done and
        # wait and so on
        if is_tasks_to_wait != 0:
            await asyncio.sleep(0.1)  # wait all tasks done
            # All tasks done
    return url_tasks

async def main():
    urls = ['',
    async with aiohttp.ClientSession() as session:
        res = await fetch_all(session, urls, 3)

if __name__ == '__main__':
    # ( do cancel all pending tasks (we do not have them,
    #  because we check all task done)
    # ( do await canceling all tasks
    # ( do stop loop
    # exit program