Creating GlobalKTable using only subset of topic columns

222 views Asked by At

We use kafka streams to process one input stream and one compacted topic with client data. In our stream processing we consume the first one and join it with the second one using a GlobalKTable, something like

    StreamsBuilder builder = new StreamsBuilder();
    
    GlobalKTable<String, Client> clients
        = builder.globalTable(CLIENT_TOPIC, Consumed.with(Serdes.String(), clientSerde));
            
    KStream<String, Foo> foos = builder.stream(FOO_TOPIC);
    
    KStream<String, Bar> bars = foos
                    .leftJoin(
                            clients,
                            (streamKey, streamValue) -> streamValue.getClientId().toString(),
                            new FooClientJoiner()
                    );

This basically works, but of course, the entire CLIENT_TOPIC is stored in memory(we use RocksDB to store the state, so it is on HDD), using nontrivial space - and this becomes a problem.

We do know however, that from the entire client data we use only one column - so if we had something like

StreamsBuilder builder = new StreamsBuilder();

// GlobalKTable from reduced topic
GlobalKTable<String, ReducedClient> reducedClients
    = builder.stream(CLIENT_TOPIC)
        .map((key,value) -> new KeyValue<String, String>(key, value.getTheOneColummnINeed()))
        .toGlobalKTable();
        
KStream<String, Foo> foos = builder.stream(FOO_TOPIC);

KStream<String, Bar> bars = foos
                .leftJoin(
                        reducedClients,
                        (streamKey, streamValue) -> streamValue.getClientId().toString(),
                        new FooReducedClientsJoiner()
                );

it would solve our problem. Creating a GlobalKTable from stream like this is not possible - but is there some equivalent for that? Or any way to reduce the space taken by the cached CLIENT_TOPIC knowing that we need just one small subset of the data?

I have also tried to manually amend the generated Client class (so that it contains only what I need), but this did not affect the resulting size of the stored GlobalKTable.

Btw, playing with the compression settings in the RocksDB did not help much.

1

There are 1 answers

2
Lucas Brutschy On

You can use Kafka Streams to write the single column to a separate topic:

reducedClients
    = builder.stream(CLIENT_TOPIC)
        .map((key,value) -> new KeyValue<String, String>(key, value.getTheOneColummnINeed()))
        .to(REDUCED_TOPIC);

KStream<String, Foo> foos = builder.stream(FOO_TOPIC);

GlobalKTable<String, Client> clients
    = builder.globalTable(REDUCED_TOPIC, Consumed.with(Serdes.String(), clientColumnSerde));

KStream<String, Bar> bars = foos
    .leftJoin(
            clients,
            (streamKey, streamValue) -> streamValue.getClientId().toString(),
            new FooReducedClientsJoiner()
    );

Alternatively, you should consider co-partitioning the two streams so that you can use a non-global KTable for the join.