I want to send events and messages of different object types using Kafka topics. So I have producer configuration class in which I create the beans of the class KafkaTemplate and ProducerFactory:
@Bean
public KafkaTemplate<String, BeltMessage> kafkaBeltMessageTemplate(final ProducerFactory<String, BeltMessage> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public KafkaTemplate<String, String> kafkaStringTemplate(final ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public KafkaTemplate<String, BeltEventDescription> kafkaBeltEventDescriptionTemplate(final ProducerFactory<String, BeltEventDescription> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public KafkaTemplate<String, BeltItemEvent> kafkaBeltItemEventTemplate(final ProducerFactory<String, BeltItemEvent> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public ProducerFactory<String, BeltMessage> producerBeltMessageFactory() {
return new DefaultKafkaProducerFactory<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));
}
@Bean
public ProducerFactory<String, String> producerStringFactory() {
return new DefaultKafkaProducerFactory<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));
}
@Bean
public ProducerFactory<String, BeltEventDescription> producerBeltEventDescriptionFactory() {
return new DefaultKafkaProducerFactory<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));
}
@Bean
public ProducerFactory<String, BeltItemEvent> producerBeltItemEventFactory() {
return new DefaultKafkaProducerFactory<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));
}
Now, it's the same implementation for everyone of them and it's just code duplicated for each bean separately, the only change is the template parameter.
Is there a way to create a generic bean like KafkaTemplate<String, Object>? I tried to abstract out an interface from BeltItemEvent and BeltEventDescription, but then I got an error in runtime because it could not be serialized by Kafka.
Servlet.service()for servlet [dispatcherServlet]in context with path [] threw exception [Request processing failed:org.apache.kafka.common.errors.SerializationException: Can't convert value of classcom.example.events.BeltEventDescriptionto classorg.apache.kafka.common.serialization.StringSerializerspecified invalue.serializer] with root causejava.lang.ClassCastException: classcom.example.events.BeltEventDescriptioncannot be cast to classjava.lang.String(com.example.events.BeltEventDescriptionis in unnamed module of loader 'app';java.lang.Stringis in modulejava.baseof loader 'bootstrap')
But there is a lot of code duplication in case I want to create new events that declare the various contracts I might need in the future.