KSQLDB java client error during PUSH query

390 views Asked by At

I try to run basic example of usage PUSH query using ksqldb java client:

public class Main {

    public static String KSQLDB_SERVER_HOST = "localhost";
    public static int KSQLDB_SERVER_HOST_PORT = 8088;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ClientOptions options = ClientOptions.create()
                .setHost(KSQLDB_SERVER_HOST)
                .setPort(KSQLDB_SERVER_HOST_PORT);
        Client client = Client.create(options);

        client.streamQuery("SELECT * FROM users EMIT CHANGES;")
                .thenAccept(streamedQueryResult -> {
                    System.out.println("Query has started. Query ID: " + streamedQueryResult.queryID());

                    RowSubscriber subscriber = new RowSubscriber();
                    streamedQueryResult.subscribe(subscriber);
                }).exceptionally(e -> {
            System.out.println("Request failed: " + e);
            return null;
        });

        client.close();
    }
}

I run Confluent env using docker-compose file: https://github.com/confluentinc/cp-all-in-one/blob/6.0.0-post/cp-all-in-one/docker-compose.yml and create users topic with data.

But got exception inside io.netty.resolver.AddressResolverGroup class, getResolver(final EventExecutor executor) method:

Request failed: java.util.concurrent.CompletionException: java.lang.IllegalStateException: executor not accepting a task

But all works fine when I run PULL query with synchronous usage:

StreamedQueryResult streamedQueryResult = client.streamQuery("SELECT * FROM users EMIT CHANGES;").get();

for (int i = 0; i < 10; i++) {
  // Block until a new row is available
  Row row = streamedQueryResult.poll();
  if (row != null) { 
    System.out.println("Row: " + row.values());
  }
}
1

There are 1 answers

0
Rodrigo Pereira Fraga On

You should not close the client connection while streaming.

Remove the client.close(); to test.