aioredis - how to process redis messages asynchronously?

620 views Asked by At

I have to process every message from redis asynchronously. Here is my attempt with aioredis:

import asyncio
import aioredis


async def reader(channel: aioredis.client.PubSub):
    while True:
        data = None
        try:
            message = await channel.get_message(ignore_subscribe_messages=True)
            if message is not None:
                print(f"(Reader) Message Received: {message}")
                data = message["data"]
        except asyncio.TimeoutError:
            pass

        if data is not None:
            await process_message(data)


async def process_message(message):
    print(f"start process {message=}")
    await asyncio.sleep(10)
    print(f"+processed {message=}")


async def publish(redis, channel, message):
    print(f"-->publish {message=} to {channel=}")
    result = await redis.publish(channel, message)
    print("     +published")
    return result


async def main():
    redis = aioredis.from_url("redis://localhost")
    pubsub = redis.pubsub()
    await pubsub.subscribe("channel:1", "channel:2")

    future = asyncio.create_task(reader(pubsub))

    await publish(redis, "channel:1", "Hello")
    await publish(redis, "channel:2", "World")

    await future


if __name__ == "__main__":
    asyncio.run(main())

The problem is that aioredis does not get_message if the previous message was not processed. The messages are processed one by one.

How to solve that issue?

1

There are 1 answers

0
SKulibin On

I've found the solution.

Instead of await process_message(data) one should use asyncio.ensure_future(process_message(data))

The idea came from AIORedis and PUB/SUB aren't asnyc