Kafka Streams exactly-once re-balance aggregation state data loss

519 views Asked by At

Running 3 Kafka Streams instances with exactly-once, but experiencing loss of data when restarting one of the streams instances (the other 2 doing re-balance). If I restart the instance quickly (within session.timeout.ms), without the other 2 doing re-balance, everything is working as expected.

  • Input and output topics are created with 6 partitions.
  • Running 3 Kafka brokers.
  • Producing data with a single python producer in a loop (acks='all').
  • Outputting data to SQL with a single Kafka Connect configured with consumer.override.isolation.level=read_committed

I am expecting the aggregated data to have the same count as the output of my python loop. And this works just fine as long as Kafka Streams is not going into re-balance state.

In short the streams instance does:

  1. Collect session data, and updating a session state.
  2. Delta updates on the session state are then re-partitioned and summed using windowed aggregation.

Grepping through my own debug output I'm inclined to believe the problem is related to transferring the aggregation state:

  1. Record A which is an update to session X is adding 0 to the aggregation.
  2. Output from the aggregation is now 6
  3. Record B which is an update to session X is adding 1 to the aggregation.
  4. Output from the aggregation is now 7
  5. Rebalance
  6. Update to session X (which may or may not be a replay or Record A) is adding 0 to the aggregation.
  7. Output from the aggregation is now 6

Simplified and stripped out version of the code: (Not really a Java developer, so sorry for non-optimal syntax)

public static void main(String[] args) throws Exception {
    props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

    final StoreBuilder<KeyValueStore<MediaKey, SessionState>> storeBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(SESSION_STATE_STORE),
            mediaKeySerde,
            sessionStateSerde
    );
    builder.addStateStore(storeBuilder);

    KStream<String, IncomingData> incomingData = builder.stream(
            SESSION_TOPIC, Consumed.with(Serdes.String(), mediaDataSerde));
    KGroupedStream<MediaKey, AggregatedData> mediaData = incomingData
                .transform(new SessionProcessingSupplier(SESSION_STATE_STORE), SESSION_STATE_STORE)
                .selectKey(...)
                .groupByKey(...);

    KTable<Windowed<MediaKey>, AggregatedData> aggregatedMedia = mediaData
                .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
                .aggregate(
                        new Initializer<AggregatedData>() {...},
                        new Aggregator<MediaKey, AggregatedData, AggregatedData>() {
                            @Override
                            public AggregatedData apply(MediaKey key, AggregatedData input, AggregatedData aggregated) {
                                // ... Add stuff to "aggregated"
                                return aggregated
                            }
                        },
                        Materialized.<MediaKey, AggregatedData, WindowStore<Bytes, byte[]>>as("aggregated-media")
                                .withValueSerde(aggregatedDataSerde)
               );

    aggregatedMedia.toStream()
            .map(new KeyValueMapper<Windowed<MediaKey>, AggregatedData, KeyValue<MediaKey, PostgresOutput>>() {
                @Override
                public KeyValue<MediaKey, PostgresOutput> apply(Windowed<MediaKey> mediaidKey, AggregatedData data) {
                        // ... Some re-formatting and then
                        return new KeyValue<>(mediaidKey.key(), output);
                }
            })
            .to(POSTGRES_TOPIC, Produced.with(mediaKeySerde, postgresSerde));

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);

    // Shutdown hook
}

and:

public class SessionProcessingSupplier implements TransformerSupplier<String, Processing.IncomingData, KeyValue<String, Processing.AggregatedData>> {
    @Override
    public Transformer<String, Processing.IncomingData, KeyValue<String, Processing.AggregatedData>> get() {
        return new Transformer<String, Processing.IncomingData, KeyValue<String, Processing.AggregatedData>>() {
            @Override
            public void init(ProcessorContext processorContext) {
                this.context = processorContext;
                this.stateStore = (KeyValueStore<String, Processing.SessionState>) context.getStateStore(sessionStateStoreName);
            }

            Override
            public KeyValue<String, Processing.AggregatedData> transform(String sessionid, Processing.IncomingData data) {
                Processing.SessionState state = this.stateStore.get(sessionid);
                // ... Update or create session state
                return new KeyValue<String, Processing.AggregatedData>(sessionid, output);
            }
        };
    }
}
0

There are 0 answers