I am doing a poc with kafka spring integration Java DSL I am reading a row from data base(DB) and send that row as msg to Kafka Topic. please find the code below. Code is compiling and i can able to fetch the record from DB, but i didn't see any msg in topic.
@Configuration
public class KafkaProduceConfig {
@Bean
public IntegrationFlow pollingAdapterFlow(EntityManagerFactory entityManagerFactory, MyTransformer transformer) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory).entityClass(MyRecord.class),
e -> e.poller(p -> p.cron("*/1 * * * * *").maxMessagesPerPoll(1).transactional())
.autoStartup(true))
.log(message -> "Polled DB Records from KafkaProduceConfig : " + message.getPayload())
.split()
.log(message -> "Record after split : " + message.getPayload())
.enrichHeaders(hrdSpec ->hrdSpec.headerExpression("myRecord", "payload",true))
.transform(transformer,"getCustomeRecord")
.enrichHeaders(hrdSpec ->hrdSpec.headerExpression("customeRecord","payload",true))
.log(message -> "Transformed Record : " + message.getPayload() +",topic :" +message.getHeaders().get("topic"))
.channel("sendToKafka")
.get();
}
@Bean
public IntegrationFlow outboundChannelAdapterFlow() {
return IntegrationFlow.from("sendToKafka")
.log(message -> "outboundChannelAdapterFlow received payload : " + message.getPayload() +",topic :"
+message.getHeaders().get("topic")+"key :"+message.getHeaders().get("key"))
.handle(m->Kafka.outboundChannelAdapter(producerFactory()).topic(m.getHeaders().get("topic").toString())
.messageKey(m.getHeaders().get("key").toString())
// .headerMapper(mapper())
.partitionId((Integer) m.getHeaders().get("partitionId")))
.get();
}
public ProducerFactory<Integer, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
}
Msg should publish to Kafka topic.
The configuration
.handle(m->Kafka.outboundChannelAdapter(producerFactory())is not correct. That lambda makes a newMessageHandlerwhich body is just to use thatKafkafactory whenever a new message arrives. This code just does not handle this message.You must look into a
handle()variant where you provide aMessageHandlerfrom the factory, not a new by lambda.So, something like this:
This way the
MessageHandleris going to be created during configuration phase. And at runtime itshandleMessage()method is going to be called against request message. All those option are now lambdas to be called at runtime.P.S. Please, edit your question for more readable code snippets.