We have a kstreams
app doing kstream-kstable
inner join. Both the topics are high volume with 256 partitions each. kstreams
App is deployed on 8 nodes with 8 GB heap each right now. We see that the heap memory keeps constantly growing and eventually OOM happens. I am not able to get the heap dump as its running in a container which gets killed when that happens. But, I have tried a few things to gain confidence that it is related to the state stores/ktable related stuff. Without the below RocksDBConfigSetter
the memory gets used up pretty quick, but with the below it is slowed down to some extent. Need some guidance to proceed further , thanks
I added below 3 properties,
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1 * 1024L);
properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
public static class CustomRocksDBConfig implements RocksDBConfigSetter {
private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(1 * 1024L * 1024L);
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
log.info("In CustomRocksDBConfig");
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
tableConfig.setBlockCache(cache);
tableConfig.setBlockSize(1 * 1024L);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
options.setMaxWriteBufferNumber(2);
}
@Override
public void close(final String storeName, final Options options) {
cache.close();
}
You could try to limit the memory usage of RocksDB across all RocksDB instances on one node. To do so you must configure RocksDB to cache the index and filter blocks in the block cache, limit the memtable memory through a shared
WriteBufferManager
and count its memory against the block cache, and then pass the sameCache
object to each instance. You can find more details and a sample configuration underhttps://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html#rocksdb
With such a setup you can specify a soft upper bound for the total heap used by all RocksDB state stores on one single instance (TOTAL_OFF_HEAP_MEMORY in the sample configuration) and then specify how much of that heap is used for writing to and reading from the state stores on one single node (TOTAL_MEMTABLE_MEMORY and INDEX_FILTER_BLOCK_RATIO in the sample configuration, respectively).
Since all values are app and workload specific you need to experiment with them and monitor the RocksDB state stores with the metrics provided by Kafka Streams.
Guidance how to handle RocksDB issues in Kafka Streams can be found under:
https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance/
Especially for your case, the following section might be interesting:
https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance/#high-memory-usage