With Spring Cloud Stream and the Kafka Streams binder, I would like to process the output of a function in another one, as in:
@Bean
public Function<KStream<String, Double>, KStream<String, Double>> sqrt() {
return numbers -> numbers.mapValues(Math::sqrt);
}
@Bean
public Consumer<KStream<String, Double>> log() {
return sqrt -> sqrt.foreach((key, value) -> log.info("{}: {}", key, value));
}
where sqrt() outputs the square root of a number, which is then logged with log(). application.yaml therefore looks like this:
spring:
cloud:
stream:
function:
bindings:
sqrt-in-0: numbers
sqrt-out-0: sqrt-numbers
log-in-0: sqrt-numbers
kafka:
streams:
bindings:
sqrt:
consumer:
application-id: sqrtApplicationId
log:
consumer:
application-id: logApplicationId
When starting the application, I get the following error:
The bean 'sqrt-numbers' could not be registered. A bean with that name has already been defined and overriding is disabled.
Action:
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true
Now of course setting definition-overriding to true is not a proper fix, and it will fail with an IllegalStateException.
How do I solve this?
A reproduction of the problem can be found here: https://github.com/cedric-schaller/dltawareprocessor-type-error
Assuming that you have two Kafka topics called,
numbersandsqrt-numbers, the following configuration should work.You can use
spring.cloud.stream.function.bindings..to override a default binding name. For example, if you want to change the binding name fromsqrt-in-0toinput, you can do it likespring.cloud.stream.function.bindings.sqrt-in-0: input. You still need to setdestinationon the overridden binding though (viaspring.cloud.stream.bindings.input.destination).The particular exception you are getting is because you are trying to reuse an already created binding name -
sqrt-numbers.