How to continuosly listen on redis stream using lettuce java library

3.1k views Asked by At

I am trying to listen on a redis stream and process the message as and when they arrive. I am using async command and I expect the message to be pushed instead of being pulled. So I don't think a while loop is required. But the following code seems to not work.

public static void main(String[] args) throws InterruptedException {

    RedisClient redisClient = RedisClient
        .create("redis://localhost:6379/");
    StatefulRedisConnection<String, String> connection
        = redisClient.connect();
    RedisAsyncCommands commands = connection.async();
    commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
    commands
        .xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
        .thenAccept(System.out::println);

    Thread.currentThread().join();
}

It just prints whatever the stream has when the program starts and does not print the messages that are added when the program is running. Isn't the callback supposed to be called for every message that is newly added into the stream?

4

There are 4 answers

3
sk l On

I think you shoud use xgroupCreate method to create the link betweent the consumer and group,otherwise you will get the error.

exception in thread "main" java.util.concurrent.ExecutionException: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'my-stream1' or consumer group 'group1' in XREADGROUP with GROUP option
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at com.test.TestList.main(TestList.java:57)
Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'my-stream1' or consumer group 'group1' in XREADGROUP with GROUP option
    at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135)
    at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108)
    at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120)
    at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111)
    at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654)
    at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614)
    at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
    at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
    at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:381)
    at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:211)
    at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:289)
    at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

the example code is following:

package com.test;

import io.lettuce.core.Consumer;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs.StreamOffset;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;

import java.util.List;
public class TestList {
    public static void main(String[] args) throws Exception {
        RedisClient redisClient = RedisClient.create("redis://localhost:6379/");
        StatefulRedisConnection<String, String> connection = redisClient.connect();
        RedisAsyncCommands commands = connection.async();
        RedisFuture<String> redisFuture = commands.xadd("my-stream1", "test", "1234");
        String redisFutureGet = redisFuture.get();
        System.out.println(redisFutureGet);
        commands.xgroupCreate(StreamOffset.latest("my-stream1"), "group1", new XGroupCreateArgs()); // add a group pointing to the stream
        RedisFuture<List<StreamMessage<String, String>>> messages = commands.xreadgroup(Consumer.from("group1", "my-stream1"),
                StreamOffset.lastConsumed("my-stream1"));
        List<StreamMessage<String, String>> res = messages.get();
        System.out.println(res);
    }
}
0
user2160372 On

I think Lettuce is only response for communicating with Redis,wether in sync,async or stream way。it is a low-level library。 so if you want such high-level function,using spinrg-data something like this:

StreamListener<String, MapRecord<String, String, String>> streamListener = new ExampleStreamListener();

   StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder().pollTimeout(Duration.ofMillis(100)).build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
                containerOptions);
Subscription subscription = container.receive(StreamOffset.fromStart("key2"), streamListener);
container.start();
//----------------------------------------------------------------

public class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

    @Override
    public void onMessage(MapRecord<String, String, String> message) {

        System.out.println("MessageId: " + message.getId());
        System.out.println("Stream: " + message.getStream());
        System.out.println("Body: " + message.getValue());
    }
}

0
Lakmal On

I know this question is a bit old but the answer could be helpful for someone else. You could repeatedly subscribe to the same Flux like below and it worked for me with xread. I think the same should work for xreadgroup as well.

RedisPubSubReactiveCommands<String, String> commands = connection.reactive();
commands.xread(new XReadArgs().block(Duration.ofSeconds(20)), XReadArgs.StreamOffset.from("some-stream", "$"))
                .doOnNext(msg -> {
                    sink.tryEmitNext(msg.getBody().get("key"));
                })
                .repeat()
                .subscribe();
1
rdo82 On

You could use the Redis reactive commands to achieve this:

RedisReactiveCommands<String, String> commands = connection.reactive();
commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
commands
    .xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
    .subscribe(System.out::println, Throwable::printStackTrace);