The following code snippet is intended to join events from a compact topic "customers.general.v0" with a topic "orders.v0" and produce a new object (with enriched data).
@Component("CustomerOrderEnricherBinding")
@RequiredArgsConstructor
public class CustomerOrderEnricherTopology implements
Function<KStream<String, Order>, Function<KTable<CustomerId, Customer>, KStream<CustomerId, CustomerOrderEnriched>>> {
private final Serde<Order> orderSerde;
private final Serde<Customer> customerSerde;
private final Serde<CustomerId> customerIdSerde;
public Function<KTable<CustomerId, Customer>, KStream<CustomerId, CustomerOrderEnriched>> apply(KStream<String, Order> orderKStream) {
return customerKTable -> {
return orderKStream.map((keyOrderCode, orderValue) -> KeyValue.pair( CustomerId.of(orderValue.getCustomer()), orderValue ))
.join(customerKTable, new CustomerOrderEnricherJoiner(), Joined.with(customerIdSerde, orderSerde, customerSerde));
};
}
}
// Versions:
// Spring Cloud: "2022.0.3"
// Spring Cloud Stream Binder Kafka Streams: "4.0.3"
// Kafka Streams: 3.4.1
The Kafka Streams lib, creates internal topics known as "changelog" and "repartition" (to achieve fault tolerant capability).
Question: There is a way to prevent the creation of "changelog" and only use compact topic for the join?
(If Materialized.withLoggingDisabled()
helps, I don't have the knowledge to use it and if anyone knows and can help, I would greatly appreciate it).
Thanks!