how to configure kafka topic in a spring boot project having flowable

47 views Asked by At

with flowable and kafka enable, the application runs but when i configure the kafka bean ConcurrentKafkaListenerContainerFactory. it generate an error : <<'org.springframework.kafka.core.KafkaOperations' that could not be found. Consider defining a bean of type 'org.springframework.kafka.core.KafkaOperations' in your configuration.>>

this same configuration, i do it in another project without flowable. there, it runs without any problem.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, CreateRecurrenceCommand>
createRecurrenceCommandListenerContainerFactory(KafkaCreateRecurrenceErrorHandler errorHandler) {

    ConcurrentKafkaListenerContainerFactory<String, CreateRecurrenceCommand> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setCommonErrorHandler(errorHandler);
    factory.setConsumerFactory(createRecurrenceCommandConsumerFactory());
    factory.setReplyTemplate(kafkaTemplate());
    factory.setRecordFilterStrategy(consumerRecord -> {
        String messageType = new String(consumerRecord.headers().lastHeader(KafkaCustomHeaders.MESSAGE_TYPE).value());
        return !ConsumedMessageType.CREATE_RECURRENCE.toString().equals(messageType);
    });
    return factory;
}


public ConsumerFactory<String, CreateRecurrenceCommand> createRecurrenceCommandConsumerFactory() {
    return generateConsumerFactory(CreateRecurrenceCommand.class, AppConstants.CREATE_RECURRENCE);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

private ConsumerFactory<String, T> generateConsumerFactory(Class clazz, String groupId) { Map<String, Object> props = new HashMap<>(defaultConsumerFactory.getConfigurationProperties());

    props.put(
            ConsumerConfig.GROUP_ID_CONFIG,
            groupId);
    ErrorHandlingDeserializer<T> errorHandlingDeserializer = new ErrorHandlingDeserializer<>((topic, data) -> {
        try {
            if (data == null) {
                return null;
            }
            return objectMapper.readValue(new String(data), clazz);
        } catch (JsonProcessingException e) {
            throw new DeserializationException("Failed to deserialise", data, false, e);
        }
    });
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), errorHandlingDeserializer);
}

in this code was expecting to filter a specific topic message: CREATE_RECURRENCE

1

There are 1 answers

0
Artem Bilan On

I'm not sure what is "flowable", but looks like you try to configure everything yourself, not relying on Spring Boot auto-configuration. For that purpose consider to change the name of your ConcurrentKafkaListenerContainerFactory to the kafkaListenerContainerFactory.