Im running a (relatively) simple KStreams app:
stream->aggregate by key->filter->foreach
It processes ~200K records / minute on AWS EC2 with 32Gb / 8CPU
Within 10 minutes of starting it the memory usage exceeds 40%. Not long after (typically less than 15min) the OS will OOM-kill it.
Configuration:
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "450000");
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 250);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, EventTimeExtractor.class.getName());
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
Aggregation step:
KTable<Windowed<String>, String> ktAgg = sourceStream.groupByKey().aggregate(
String::new,
new Aggregate(),
TimeWindows.of(20 * 60 * 1000L).advanceBy(5 * 60 * 1000L).until(40 * 60 * 1000L),
stringSerde, "table_stream");
Using Kafka 0.10.1.1
Suggestions on where to look for the culprit?
side note:
I tried instrumenting this app with NewRelic javaagent. When I ran it with -XX:+useG1GC
it did the standard "use lots of memory and then get killed" but when I removed the G1GC param the process ran up System Load to > 21. I had to kill that one myself.
What output there was from NewRelic didn't show anything outrageous w/re memory mgmt.