Join one-to-many relation with spring cloud kafka stream

1k views Asked by At

I'm trying to join data from two topics person and address where one person can have multiple addresses. The data published into the topics look like the following:

//person with id as key
{"id": "123", "name": "Tom Tester"}

//addresses with id as key
{"id": "321", "person_id": "123", "address": "Somestreet 12, 4321 Somewhere"}
{"id": "432", "person_id": "123", "address": "Otherstreet 12, 5432 Nowhere"}

After the join I would like to have an aggregated output (to be indexed in elasticsearch) which should look something like this:

{
  "id": "123",
  "name": "Tom Tester",
  "addresses": [
    {
      "id": "321",
      "address": "Somestreet 12, 4321 Somewhere"
    },
    {
      "id": "432",
      "address": "Otherstreet 12, 5432 Nowhere"
    }
  ]
}

Whenever person or address topic gets an update the aggregated person should also be updated. Currently I achieved to get updates on the aggregated person only when addresses are published but not when the person itself is changed. Any ideas what is wrong with this code?

@SpringBootApplication
@EnableBinding(PersonAggregatorBinding.class)
public class KafkaStreamTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamTestApplication.class, args);
    }

    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamTestApplication.class);

    @StreamListener
    @SendTo("person-aggregation")
    public KStream<String, PersonAggregation> process(
            @Input("person-input") KTable<String, Person> personInput,
            @Input("address-input") KTable<String, Address> addressInput) {
        KTable<String, AddressAggregation> addressAggregate = addressInput.toStream()
                .peek((key, value) -> LOG.info("addr {}: {}", key, value))
                .groupBy((k, v) -> v.getPersonId(), Grouped.with(null, new AddressSerde()))
                .aggregate(
                        AddressAggregation::new,
                        (key, value, aggregation) -> {
                            aggregate(aggregation, value);
                            return aggregation;
                        }, Materialized.with(Serdes.String(), new AddressAggregationSerde()));

        addressAggregate.toStream()
                .peek((key, value) -> LOG.info("aggregated addr: {}", value));

        return personInput.toStream()
                .leftJoin(addressAggregate, this::join, Joined.with(Serdes.String(), new PersonSerde(), new AddressAggregationSerde()))
                .peek((key, value) -> LOG.info("aggregated person: {}", value));
    }

    private PersonAggregation join(Person person, AddressAggregation addrs) {
        return PersonAggregation.builder()
                .id(person.getId())
                .name(person.getName())
                .addresses(addrs)
                .build();
    }

    public void aggregate(AddressAggregation aggregation, Address address) {
        if(address != null){
            aggregation.removeIf(it -> Objects.equals(it.getId(), address.getId()));
            if(address.isValid()) {
                aggregation.add(address);
            }
        }
    }
}
0

There are 0 answers