Kafka Streams topology initially dropping messages to intermediate topics

48 views Asked by At

We are leveraging Kafka Streams in a CDC setup streaming table level database changes into Kafka, aggregating them quite heavily and eventually streaming the result topics into OpenSearch.

I've noticed some strange behaviour when starting up the Kafka Streams applications after an initial database snapshot has been streamed into Kafka. Working on that snapshot data seems to produce message loss in case the intermediate topics utilized by the Streams topologies do not exist (and are auto-created on demand).

In fact I see e.g. about 3000 messages produced where I'd expect over 6000.

The topology (in that case) takes an input topic and does a series of 1:1 FK Left Joins to some other topics, so I'd expect the resulting topic to resemble the message count of the input topic. Specifically, I see message loss on one of the first internally created FK Join -changelog topics that carries on through the entire topology.

Assume this snippet:

            streamsBuilder //
                    .table( //
                            folderTopicName, //
                            Consumed.with( //
                                    folderKeySerde, //
                                    folderSerde)) //
                    .leftJoin( //
                            agencies, //
                            Folder::agencyIdValue, //
                            AggregateFolder::new, //
                            TableJoined.as("folder-to-agency"), //
                            Materializer //
                                    .<FolderId, AggregateFolder>named("folder-to-agency-materialized") //
                                    .withKeySerde(folderKeySerde) //
                                    .withValueSerde(aggregateFolderSerde)) //
                    .leftJoin( //
                            documents, //
...

The folder-to-agency-materialized-changelog topic shows significantly less messages then fed into the initial folderTopicName topic.

While starting up, the application logs lots of messages like these:

kstreams-folder-aggregator-folder-to-agency-subscription-response-topic=UNKNOWN_TOPIC_OR_PARTITION, ...

(For all kinds of intermediate topics)

In addition to that, when streaming another snapshot into Kafka while the Streams applications are already running (and thus all necessary intermediate topics already created) the resulting count of messages that have passed the topology resembles the expected count.

So, to sum this up: what might cause the mentioned initial message loss? Is the (automatic) intermediate topic creation somehow involved (although the logs show WARN level messages on this)?

0

There are 0 answers