Kafka Streams - Disable creation of internal topic "changelog"

97 views Asked by At

The following code snippet is intended to join events from a compact topic "customers.general.v0" with a topic "orders.v0" and produce a new object (with enriched data).

@Component("CustomerOrderEnricherBinding")
@RequiredArgsConstructor
public class CustomerOrderEnricherTopology implements
    Function<KStream<String, Order>, Function<KTable<CustomerId, Customer>, KStream<CustomerId, CustomerOrderEnriched>>> {
  private final Serde<Order> orderSerde;
  private final Serde<Customer> customerSerde;
  private final Serde<CustomerId> customerIdSerde;

  public Function<KTable<CustomerId, Customer>, KStream<CustomerId, CustomerOrderEnriched>> apply(KStream<String, Order> orderKStream) {
    return customerKTable -> {
        return orderKStream.map((keyOrderCode, orderValue) -> KeyValue.pair( CustomerId.of(orderValue.getCustomer()), orderValue ))
                           .join(customerKTable, new CustomerOrderEnricherJoiner(), Joined.with(customerIdSerde, orderSerde, customerSerde));
    };
  }
}
// Versions:
//   Spring Cloud: "2022.0.3"
//   Spring Cloud Stream Binder Kafka Streams: "4.0.3"
//    Kafka Streams: 3.4.1

The Kafka Streams lib, creates internal topics known as "changelog" and "repartition" (to achieve fault tolerant capability).

Question: There is a way to prevent the creation of "changelog" and only use compact topic for the join?

(If Materialized.withLoggingDisabled() helps, I don't have the knowledge to use it and if anyone knows and can help, I would greatly appreciate it).

Thanks!

0

There are 0 answers