I want to try using ksqlDB to count a topic's messages grouped by a column, then stream the aggregation to a new Kafka topic.
First I'd read from the topic:
CREATE STREAM "foo" (
id BIGINT,
col BIGINT
)
WITH (
KAFKA_TOPIC='foo',
FORMAT='AVRO'
);
This creates the stream successfully and I can get results using SET 'auto.offset.reset' = 'earliest'; select * from foo.
Then, I'd create the table using:
CREATE TABLE "bar"
WITH (
KAFKA_TOPIC='bar',
FORMAT='AVRO'
)
AS SELECT col, count(*) c
FROM foo
GROUP BY col
EMIT CHANGES
However, this table is empty.
I think for the stream, I was supposed to get the id column from the message key using id BIGINT KEY. The id is duplicated in the message key and value, but if I use id BIGINT KEY, then the stream is empty. How can I populate the table with the counts?