I have to process every message from redis asynchronously. Here is my attempt with aioredis:
import asyncio
import aioredis
async def reader(channel: aioredis.client.PubSub):
while True:
data = None
try:
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None:
print(f"(Reader) Message Received: {message}")
data = message["data"]
except asyncio.TimeoutError:
pass
if data is not None:
await process_message(data)
async def process_message(message):
print(f"start process {message=}")
await asyncio.sleep(10)
print(f"+processed {message=}")
async def publish(redis, channel, message):
print(f"-->publish {message=} to {channel=}")
result = await redis.publish(channel, message)
print(" +published")
return result
async def main():
redis = aioredis.from_url("redis://localhost")
pubsub = redis.pubsub()
await pubsub.subscribe("channel:1", "channel:2")
future = asyncio.create_task(reader(pubsub))
await publish(redis, "channel:1", "Hello")
await publish(redis, "channel:2", "World")
await future
if __name__ == "__main__":
asyncio.run(main())
The problem is that aioredis does not get_message
if the previous message was not processed. The messages are processed one by one.
How to solve that issue?
I've found the solution.
Instead of
await process_message(data)
one should useasyncio.ensure_future(process_message(data))
The idea came from AIORedis and PUB/SUB aren't asnyc