Caching results in an async environment

15.3k views Asked by At

I am working in a FastAPI endpoint that make a I/O bound operation, which is async for efficiency. However, it takes time, so I would like to cache the results to reuse it for a period of time.

I currently I have this:

from fastapi import FastAPI
import asyncio

app = FastAPI()

async def _get_expensive_resource(key) -> None:
    await asyncio.sleep(2)
    return True

@app.get('/')
async def get(key):
    return await _get_expensive_resource(key)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("test:app")

I am trying to use the cachetools package to cache the results and I have tried something like the following:

import asyncio
from cachetools import TTLCache
from fastapi import FastAPI
  
app = FastAPI()

async def _get_expensive_resource(key) -> None:
    await asyncio.sleep(2)
    return True

class ResourceCache(TTLCache):
    def __missing__(self, key):
        loop = asyncio.get_event_loop()
        resource = loop.run_until_complete(_get_expensive_resource(key))
        self[key] = resource
        return resource

resource_cache = ResourceCache(124, 300)

@app.get('/')
async def get(key: str):
    return resource_cache[key]

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("test2:app")

However, this fails, because, as far as I understand, the __missing__ method is sync and you can't call async from sync from async. The error is:

RuntimeError: this event loop is already running.

Similar error happen if I use plain asyncio instead of uvloop.

For the asyncio event loop, I have tried using nest_asyncio package, but it does not patch uvloop and also, even when using it with asyncio, it seems like the service freezes after using it the first time.

Do you have any idea how could I acomplish this?

2

There are 2 answers

2
Nicolas Martinez On BEST ANSWER

Auto-answering for other who come across this (including myself in fifteen days):

TTLCache works like a normal python dictionary, accessing a missing key will call the __missing__ method. So, we would like to use the value in the dictonary if present, and if not, we can gather the resource in this method. This method should also set the key in the cache (so next time it will be present) and return the value for the use this time.

class ResourceCache(TTLCache):
    def __missing__(self, key) -> asyncio.Task:
        # Create a task 
        resource_future = asyncio.create_task(_get_expensive_resource(key))
        self[key] = resource_future
        return resource_future

So, we have a cache (essentially a dictionary) that maps keys to asyncio.Tasks. The tasks will be executed asynchronously in the event loop (which is already started by FastAPI!). And when we need the result we can await for them in the endpoint code or actually anywhere, as long as its and async function!

@app.get("/")
async def get(key:str) -> bool:
    return await resource_cache[key]

Calling this endpoint for a second time (within the timeout of the cache) will use the cached resource (in our example mocked with 'true').

4
Antonio Gomez Alvarado On

Here is an example of how to cache a FastAPI call using the cachetools library with the same async function above without any custom class needed:

from fastapi import FastAPI
from cachetools import TTLCache
import asyncio

app = FastAPI()

# Create a cache with a maximum size of 100 entries and a TTL of 60 seconds
cache = TTLCache(maxsize=100, ttl=60)


async def _get_expensive_resource(key) -> None:
    await asyncio.sleep(5)
    return True


@app.get("/{key}")
async def get(key):
    # Check if the result is already in the cache
    result = cache.get(key)
    if result is not None:
        print(f"Found it in cache for key {key}")
        return result

    result = await _get_expensive_resource(key)

    # Store the result in the cache
    cache[key] = result

    return result


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

The first time the route is called, the result is computed and stored in the cache. Subsequent calls to the route within the next 60 seconds will return the cached result without recomputing it.

Then I called it locally from my terminal

curl http://localhost:8000/mykey

First call took 5 seconds and within the first minute all the calls I executed got an immediate response.