I try to run basic example of usage PUSH query using ksqldb java client:
public class Main {
public static String KSQLDB_SERVER_HOST = "localhost";
public static int KSQLDB_SERVER_HOST_PORT = 8088;
public static void main(String[] args) throws ExecutionException, InterruptedException {
ClientOptions options = ClientOptions.create()
.setHost(KSQLDB_SERVER_HOST)
.setPort(KSQLDB_SERVER_HOST_PORT);
Client client = Client.create(options);
client.streamQuery("SELECT * FROM users EMIT CHANGES;")
.thenAccept(streamedQueryResult -> {
System.out.println("Query has started. Query ID: " + streamedQueryResult.queryID());
RowSubscriber subscriber = new RowSubscriber();
streamedQueryResult.subscribe(subscriber);
}).exceptionally(e -> {
System.out.println("Request failed: " + e);
return null;
});
client.close();
}
}
I run Confluent env using docker-compose file: https://github.com/confluentinc/cp-all-in-one/blob/6.0.0-post/cp-all-in-one/docker-compose.yml and create users topic with data.
But got exception inside io.netty.resolver.AddressResolverGroup class, getResolver(final EventExecutor executor) method:
Request failed: java.util.concurrent.CompletionException: java.lang.IllegalStateException: executor not accepting a task
But all works fine when I run PULL query with synchronous usage:
StreamedQueryResult streamedQueryResult = client.streamQuery("SELECT * FROM users EMIT CHANGES;").get();
for (int i = 0; i < 10; i++) {
// Block until a new row is available
Row row = streamedQueryResult.poll();
if (row != null) {
System.out.println("Row: " + row.values());
}
}
You should not close the client connection while streaming.
Remove the
client.close();
to test.