I am trying to listen on a redis stream and process the message as and when they arrive. I am using async command and I expect the message to be pushed instead of being pulled. So I don't think a while loop is required. But the following code seems to not work.
public static void main(String[] args) throws InterruptedException {
RedisClient redisClient = RedisClient
.create("redis://localhost:6379/");
StatefulRedisConnection<String, String> connection
= redisClient.connect();
RedisAsyncCommands commands = connection.async();
commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
commands
.xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
.thenAccept(System.out::println);
Thread.currentThread().join();
}
It just prints whatever the stream has when the program starts and does not print the messages that are added when the program is running. Isn't the callback supposed to be called for every message that is newly added into the stream?
I think you shoud use xgroupCreate method to create the link betweent the consumer and group,otherwise you will get the error.
the example code is following: