I'm trying to join data from two topics person and address where one person can have multiple addresses. The data published into the topics look like the following:
//person with id as key
{"id": "123", "name": "Tom Tester"}
//addresses with id as key
{"id": "321", "person_id": "123", "address": "Somestreet 12, 4321 Somewhere"}
{"id": "432", "person_id": "123", "address": "Otherstreet 12, 5432 Nowhere"}
After the join I would like to have an aggregated output (to be indexed in elasticsearch) which should look something like this:
{
"id": "123",
"name": "Tom Tester",
"addresses": [
{
"id": "321",
"address": "Somestreet 12, 4321 Somewhere"
},
{
"id": "432",
"address": "Otherstreet 12, 5432 Nowhere"
}
]
}
Whenever person or address topic gets an update the aggregated person should also be updated. Currently I achieved to get updates on the aggregated person only when addresses are published but not when the person itself is changed. Any ideas what is wrong with this code?
@SpringBootApplication
@EnableBinding(PersonAggregatorBinding.class)
public class KafkaStreamTestApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamTestApplication.class, args);
}
private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamTestApplication.class);
@StreamListener
@SendTo("person-aggregation")
public KStream<String, PersonAggregation> process(
@Input("person-input") KTable<String, Person> personInput,
@Input("address-input") KTable<String, Address> addressInput) {
KTable<String, AddressAggregation> addressAggregate = addressInput.toStream()
.peek((key, value) -> LOG.info("addr {}: {}", key, value))
.groupBy((k, v) -> v.getPersonId(), Grouped.with(null, new AddressSerde()))
.aggregate(
AddressAggregation::new,
(key, value, aggregation) -> {
aggregate(aggregation, value);
return aggregation;
}, Materialized.with(Serdes.String(), new AddressAggregationSerde()));
addressAggregate.toStream()
.peek((key, value) -> LOG.info("aggregated addr: {}", value));
return personInput.toStream()
.leftJoin(addressAggregate, this::join, Joined.with(Serdes.String(), new PersonSerde(), new AddressAggregationSerde()))
.peek((key, value) -> LOG.info("aggregated person: {}", value));
}
private PersonAggregation join(Person person, AddressAggregation addrs) {
return PersonAggregation.builder()
.id(person.getId())
.name(person.getName())
.addresses(addrs)
.build();
}
public void aggregate(AddressAggregation aggregation, Address address) {
if(address != null){
aggregation.removeIf(it -> Objects.equals(it.getId(), address.getId()));
if(address.isValid()) {
aggregation.add(address);
}
}
}
}