Asynchronous code failure when connecting to redis

3.7k views Asked by At

I created a small class to perform basic operations with redis, using aioredis.

class RedisService:
    def __init__(self, r_url) -> str:
        self.redis = r_url

    async def create_connection(self):
        return await aioredis.create_redis(self.redis)

    async def _get(self, key) -> str:
        try:
            return await self.create_connection().get(key, encoding='utf-8')
        finally:
            await self._close()

    async def _set(self, key, value) -> None:
        await self.create_connection().set(key, value)
        await self._close()

    async def _close(self) -> None:
        self.create_connection().close()
        await self._redis.wait_closed() 

And a test handler to call the write/read operation for redis

@router.post('/perform')
async def index():
    key = 'test'
    value = 'test'
    value = await RedisService(r_url)._set(key, value)
    return {'result': value}

But get error

    await self.create_connection.set(key, value)
AttributeError: ''coroutine'' object has no attribute 'set'

I guess the problem could be that the asynchronous code has to be run through an event loop

asyncio.run(some coroutine)

But I can't understand how I can build this logic into my code

2

There are 2 answers

0
dirn On BEST ANSWER

Your problem is how you use create_connection. You have to call it and await what it returns.

await self.create_connection()

You’ll then need to await set and get as well. As a one-liner this would get messy.

await (await self.create_connection()).set(key, value)

To help clean this up, you should split the awaits into separate statements.

conn = await self.create_connection()
await conn.set(key, value)

Creating a new connection each time you need to perform an operation can be expensive. I’d recommend changing create_connection in one or two ways.

Either have it attach the connection to your instance

async def create_connection(self):
    self.conn = await aioredis.create_redis(self.redis)

You can call this after you instantiate an instance of RedisService and then use

await self.conn.set(key, value)

Or you can switch to using a connection pool.

async def create_connection(self):
    return await aioredis.create_redis_pool(self.redis)
1
Hilesh On

event_loop provided by uvicorn when a fastapi app is launched can take care of calling redis get/set value in an asynchronous fashion. refer -> https://fastapi.tiangolo.com/tutorial/first-steps/

below code snippet available at "https://github.com/tiangolo/fastapi/issues/1694" is altered to fit the ask from question.

moving the create connection result inside the object state helps taking care of asynchronous resolution of network call.

when path "/" is queried object's state which holds the connection will be used to set the result in an asynchronous fashion to redis.

from fastapi import FastAPI
from connections import redis_cache

app = FastAPI()

@app.on_event('startup')
async def startup_event():
    await redis_cache.create_connection(r_url="redis://localhost")

@app.on_event('shutdown')
async def shutdown_event():
    await redis_cache._close()

@app.get("/")
async def root():
    key = 'key'
    value = 'data'
    await redis_cache._set(key, value)

@app.get("/value")
async def get_value():
    key = 'key'
    return await redis_cache._get(key)

from typing import Optional
from aioredis import Redis, create_redis


class RedisCache:
    def __init__(self) -> str:
        self.redis_cache = None

    async def create_connection(self,r_url):
        self.redis_cache = await create_redis(r_url)

    async def _get(self, key) -> str:
        return await self.redis_cache.get(key)

    async def _set(self, key, value) -> None:
        await self.redis_cache.set(key, value)

    async def _close(self) -> None:
        self.redis_cache.close()
        await self.redis_cache.wait_closed()

redis_cache = RedisCache()