Spring Expression Language issue

834 views Asked by At

I have the following class. I have verified in the console, the constructor of this class is called(during bean creation) before resolving the topic placeholder value in Kafka listener:

public class MsgReceiver<MSG> extends AbstractMsgReceiver<MSG> implements 
MessageReceiver<MSG> {

@SuppressWarnings("unused")
private String topic;

public MsgReceiver(String topic, MessageHandler<MSG> handler) {
    super(handler);
    this.topic = topic;
}

@KafkaListener(topics = "${my.messenger.kafka.topics.#{${topic}}.value}", groupId = "${my.messenger.kafka.topics.#{${topic}}.groupId}")
public void receiveMessage(@Headers Map<String, Object> headers, @Payload MSG payload) {
    System.out.println("Received "+payload);
    super.receiveMessage(headers, payload);
}

}

I have my application.yml as follows:

my:
  messenger:
    kafka:
      address: localhost:9092
      topics:
        topic_1:
          value: my_topic
          groupId: 1

During bean creation, I pass "topic_1" which I want should dynamically be used inside Kafka listener topic placeholder. I tried as shown in the code itself, but it does not work. Please suggest how to do that.

1

There are 1 answers

11
Gary Russell On BEST ANSWER

Placeholders are resolved before SpEL is evaluated; you can't dynamically build a placeholder name using SpEL. Also, you can't reference fields like that; you have to do it indirectly via the bean name (and a public getter).

So, to do what you want, you have to add a getter and get the property dynamically from the environment after building the property name with SpEL.

There is a special token __listener which allows you to reference the current bean.

Putting it all together...

@SpringBootApplication
public class So63056065Application {

    public static void main(String[] args) {
        SpringApplication.run(So63056065Application.class, args);
    }

    @Bean
    public MyReceiver receiver() {
        return new MyReceiver("topic_1");
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("my_topic").partitions(1).replicas(1).build();
    }
}

class MyReceiver {

    private final String topic;

    public MyReceiver(String topic) {
        this.topic = topic;
    }

    public String getTopic() {
        return this.topic;
    }

    @KafkaListener(topics = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.value')}",
            groupId = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.groupId')}")
    public void listen(String in) {
        System.out.println(in);
    }

}

Result...

2020-07-23 12:13:44.932  INFO 39561 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 1
    group.instance.id = null
...

and

1: partitions assigned: [my_topic-0]