Implementing Request Response in Apache Kafka java

2k views Asked by At

Please find the use case we need to implement.

First, we need to invoke a Kafka producer a message as a rest service, they will process and give back the response in another topic.

For us, It is a request-reply topic we need to reply back for the same request the response, using replykafka template is working fine, but we can set co-relation id in the header.

As a topic message metadata there are sending in attributes, is there any way to map the co-relation id with request topic message and reply topic message.

Explain to you better.

One microservice expects the payload as given below with correlationId in payload.

{
  "operationDate": "2020-09-16T11:58:25",
  "correlationId": "-5544538377183901824042719876882142227",
  "birthDate": "2013-12-12",
  "firstNameEn": "boby",
  "firstNameAr": "الشيخ",
}

The microservice will process the payload and will give a response in another topic as.

{
  "correlationId": -5544538377183901824042719876882142227,
  "consumerId": null,
  "userid": 123456,
  "statusCode": "SUCCESS",
  "errors": null
}

Now as this we need to implement using spring ReplyingKafkaTemplate.

As ReplyingKafkaTemplate will work with correlationId in the header only

2

There are 2 answers

3
Gary Russell On

Assuming you mean you want to include the topic(s) in the correlation id, see

/**
 * Set a function to be called to establish a unique correlation key for each request
 * record.
 * @param correlationStrategy the function.
 * @since 2.3
 */
public void setCorrelationIdStrategy(Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy) {

You can create your own correlation id, based on the ProducerRecord (which has the topic()).

You just need to make sure it is unique. If you manually set the KafkaHeaders.REPLY_TOPIC, it will be visible to the strategy.

EDIT

With the correlation id in the payload, use setCorrelationIdStrategy to extract the correlationId from the payload and add a RecordInterceptor to do the same on the reply side.

0
shuaib On

Thanks for the hint.

I have done as overriding the payload with Kafka header correlationId.

@Override
    protected ListenableFuture<SendResult> doSend(ProducerRecord producerRecord) {
        if(producerRecord.value()!=null){
           // i have appeneded the header correlationId in th payload
        }
        return super.doSend(producerRecord);
    }

And in Replay onMessage ,i have populated the response payload correlationId to the header.

@Override
    public void onMessage(List<ConsumerRecord<K, R>> data) {
        data.forEach(
            krConsumerRecord -> //update each record header
        );
        super.onMessage(data);
    }

In this way was successfully integrated request-response semantics with correlationId in the request and response payload.