@KafkaListener: generate clientId and groupId by incoming parameters

2k views Asked by At

I have been using @KafkaListeners for the sort of Kafka event consumers that must have unique ids and every event consumer must have a separate group with a very special name. For example:

@KafkaListener(id="AmazingProductEventConsumer",
               topics = "${kafka......topic}",
               clientIdPrefix = "AmazingProductEventConsumerClientId",
               groupId = "${kafka.group.id.prefix}-${environemnt.id}-${application.name}-${kafka.......topic}-#{T(java.util.UUID).randomUUID().toString()}")
public class AmazingProductEventConsumer {
   ... methods...
}

or with Batch event listener:

public class ProductBatchEventConsumer {

   @KafkaListener(id="ProductBatchEventConsumer ",
               topics = "${kafka......topic}",
               clientIdPrefix = "ProductBatchEventConsumerClientId",
               groupId = "${kafka.group.id.prefix}-${environemnt.id}-${application.name}-${kafka.......topic}-#{T(java.util.UUID).randomUUID().toString()}")
   public void batchEventConsumer(List<Record> records) {
     .... 
   }
}

The Application is pretty large, so many consumers subscribed to the same topic, but in general, about 500-600 consumers subscribed to ~180-200 topics in multiple services. I want to avoid a boilerplate code and extract common patterns that could be used for generating these parameters, except for topics and partitions.

In Spring Boot 2.2 I used a separated BeanPostProcessor for this purpose and generated needed fields before initialization and replaced KafkaListener annotation for consumers with needed instances, but in Spring Boot 2.5.0 this feature no longer available, since KafkaListenerAnnotationBeanPostProcessor uses KafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(clazz, KafkaListener.class); that synthesizes new KafkaListeners based firstly declared annotation properties.

I am curious, is there any legal way on how to generate @KafkaListener's field id, clientId, groupId, and other fields that could be extracted from other listeners by a common pattern.

1

There are 1 answers

8
Gary Russell On

You can use much simpler SpEL.

groupId = "#{@someBean.groupId}"

and put the placeholders and UUID generation in getGroupId() on that bean.

And you can use a custom annotation meta-annotated with @KafkaListener and your props to avoid boiler plate.

Or, if you have some other variability on each listener (other than the UUID), use

#{@someBean.groupId('${small.property}'})