with flowable and kafka enable, the application runs but when i configure the kafka bean ConcurrentKafkaListenerContainerFactory. it generate an error : <<'org.springframework.kafka.core.KafkaOperations' that could not be found. Consider defining a bean of type 'org.springframework.kafka.core.KafkaOperations' in your configuration.>>
this same configuration, i do it in another project without flowable. there, it runs without any problem.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, CreateRecurrenceCommand>
createRecurrenceCommandListenerContainerFactory(KafkaCreateRecurrenceErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<String, CreateRecurrenceCommand> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(errorHandler);
factory.setConsumerFactory(createRecurrenceCommandConsumerFactory());
factory.setReplyTemplate(kafkaTemplate());
factory.setRecordFilterStrategy(consumerRecord -> {
String messageType = new String(consumerRecord.headers().lastHeader(KafkaCustomHeaders.MESSAGE_TYPE).value());
return !ConsumedMessageType.CREATE_RECURRENCE.toString().equals(messageType);
});
return factory;
}
public ConsumerFactory<String, CreateRecurrenceCommand> createRecurrenceCommandConsumerFactory() {
return generateConsumerFactory(CreateRecurrenceCommand.class, AppConstants.CREATE_RECURRENCE);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
private ConsumerFactory<String, T> generateConsumerFactory(Class clazz, String groupId) { Map<String, Object> props = new HashMap<>(defaultConsumerFactory.getConfigurationProperties());
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
ErrorHandlingDeserializer<T> errorHandlingDeserializer = new ErrorHandlingDeserializer<>((topic, data) -> {
try {
if (data == null) {
return null;
}
return objectMapper.readValue(new String(data), clazz);
} catch (JsonProcessingException e) {
throw new DeserializationException("Failed to deserialise", data, false, e);
}
});
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), errorHandlingDeserializer);
}
in this code was expecting to filter a specific topic message: CREATE_RECURRENCE
I'm not sure what is "flowable", but looks like you try to configure everything yourself, not relying on Spring Boot auto-configuration. For that purpose consider to change the name of your
ConcurrentKafkaListenerContainerFactory
to thekafkaListenerContainerFactory
.