We have a very strange problem with pull queries in KsqlDB.
Setup
First of all, we have a set of 6 machines each containing its instance of KsqlDB.
Second, we have this setup:
listeners=http://0.0.0.0:8088
ksq.advertised_listener is set for each node
ksql.heartbeat.enable=true
ksql.streams.num.standby.replicas=1
ksql.query.pull.enable.standby.reads=true
ksql.heartbeat.enable=true
We have a scenario very similar to what is described here.
We have an input stream with 60 partitions. Let's call it events.
We declared a stream:
CREATE STREAM EVENTS (EVENT_TYPE STRING, TS STRING) WITH (CLEANUP_POLICY='delete', FORMAT='json', KAFKA_TOPIC='events', TIMESTAMP='ts', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX');
- We defined a table with aggregations like this:
CREATE TABLE EVENTS_HOURLY_COUNTS AS
SELECT EVENTS.ROW_PARTITION AS PARTITION,
COUNT(*)
FROM EVENTS
WINDOW TUMBLING ( SIZE 1 HOURS )
GROUP EVENTS.BY ROW_PARTITION
EMIT CHANGES;
It created for us 3 topics, 1 visible and 2 hidden.
Kafka Topic | Partitions | Partition Replicas
-------------------------------------------------------------------------------------------------------------------------------------------
EVENTS_HOURLY_COUNTS | 60 | 2
_confluent-ksql-data_query_CTAS_EVENTS_HOURLY_COUNTS_209-Aggregate-Aggregate-Materialize-changelog | 60 | 2
_confluent-ksql-data_query_CTAS_EVENTS_HOURLY_COUNTS_209-Aggregate-GroupBy-repartition | 60 | 2
The problem
When we issue pull queries for this table it returns us sporadically inconsistent results without any errors in logs.
Our queries look like this:
SELECT WINDOWSTART, partition, event_count FROM events_hourly_counts WHERE WINDOWSTART >= 1708452000000 AND WINDOWEND <= 1708509600000
We run them against already closed periods so we expect that newly arrived data shouldn't interfere with it. We expect to get data from 60 partitions but sometimes (roughly 1 out of 10) it returns us less rows from 44 to 54 and sometimes even 61.
And as I said before the results are considered successful, there are no errors in log entries. If anyone could help somehow with such a case it would be great. Thanks in advance!
UPD:
I tried to create a table with only one partition:
CREATE TABLE EVENTS_HOURLY_COUNTS_2 WITH (PARTITIONS=1)
AS SELECT EVENTS.ROW_PARTITION AS PARTITION,
COUNT(*)
FROM EVENTS
WINDOW TUMBLING ( SIZE 1 HOURS )
GROUP EVENTS.BY ROW_PARTITION
EMIT CHANGES;
So there is only one partition but it still collects the keys from 0 to 59. And it's the same behavior.
When I run pull query for 20 hours I expect to receive 20 * 60 = 1200 rows in resulting set. Most times it is 1200 rows but from time to time it could be 1199, 936 or even 1201 rows!