Why redis stream consumers should specify an individual name in the same consumer group?

2.4k views Asked by At

The codes below show how I test the Redis stream functions.

And I found that different processes with the same consumer name are competing to consume messages in the same stream. In my understanding, if this performance is normal, Redis should not design a function to specify the consumer name.

Is there any problem with my understanding? Or am I using the wrong method?

import asyncio
import aioredis

# consumer with name "a", subscribing two streams
async def consume_a():
    try:
        while True:
            redis = aioredis.from_url("redis://localhost:36379")
            res = await redis.xreadgroup(
                "test_consumer_group",
                "consumer_a",
                streams={"test_stream": ">", "test_stream_1": ">"},
                count=1,
                block=10000,
                noack=True,
            )
            print(res)
    finally:
        await redis.close()

consumer with name "b", subscribing two streams

async def consume_b():
    try:
        while True:
            redis = aioredis.from_url("redis://localhost:36379")
            res = await redis.xreadgroup(
                "test_consumer_group",
                "consumer_b",
                streams={"test_stream": ">", "test_stream_1": ">"},
                count=1,
                block=10000,
                noack=True,
            )
            print(res)
    finally:
        await redis.close()

create group before runing script

async def config_group_0():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        res = await redis.xgroup_create("test_stream", "test_consumer_group")
        print(res)
    finally:
        await redis.close()

async def config_group_1():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        res = await redis.xgroup_create("test_stream_1", "test_consumer_group")
        print(res)
    finally:
        await redis.close()

producers

async def produce_0():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        i = 0
        while i < 100:
            res = await redis.xadd(
                "test_stream",
                {"domain_name": "test_domain_name_0", "sid": 0},
                maxlen=5,
            )
            print(res)
            i += 1
    finally:
        await redis.close()

async def produce_1():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        i = 0
        while i < 100:
            res = await redis.xadd(
                "test_stream_1",
                {"domain_name": "test_domain_name_1", "sid": 1},
                maxlen=2,
            )
            print(res)
            i += 1
    finally:
        await redis.close()

test code

if __name__ == "__main__":
    # two coroutines consume messages from two streams with the same consumer name
    asyncio.run(asyncio.gather(consume_a(), consume_a(), produce_0(), produce_1()))
2

There are 2 answers

2
Ali Malek On

Based on the Redis document:

One of the guarantees of consumer groups is that a given consumer can only see the history of messages that were delivered to it, so a message has just a single owner.

Read these documents for more information:

2
jian On

for consumer to get self PEL, or compete and repeated consume PEL