I am trying to implement a circuit breaker with resilence4j for a kafka consumer that in turn calls a downstream API. Both the kafka consumer and API are reactive.
Requirement: There should be a Circuit Breaker in case the downstream API throws exception and the kafka consumer should pause if the circuit is open. The consumer should resume once the circuit goes back to closed state.
This is my consumer:
@EventListener(ApplicationStartedEvent.class) public void consume() {
reactiveKafkaReceiver
.doOnNext(record -> {
log.info("Reading offset {}, message:{}", record.offset(),record.value());
})
.concatMap(r ->
this.processRecord(r)
.doOnSuccess(event -> r.receiverOffset().acknowledge())
.doOnError(error -> log.error("Error processing consumer events: {} ", error.getMessage()))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true))
.onErrorResume(error -> {
log.error("Retries exhausted for " + r.value());
//deadLetterPublishingRecoverer.accept(r, new Exception(error));
//r.receiverOffset().acknowledge();
return Mono.empty();
}))
.repeat()
.subscribe();
}
I have been able to implement circuit breaker on the processRecord() method. Problem is, I am not able to pause and resume the kafka consumer based on circuit breaker events.
I have been able to do it in non reactive kafka using the KafkaListenerEndpointRegistry and the messageListenerContainers. I have hit a roadblock for reactive kafka. Any help would be greatly appreciated. Thanks!
I tried the below but got an exception:
reactiveKafkaConsumerTemplate
.doOnConsumer( consumer -> {
// Your code inside doOnConsumer
System.out.println("Consumer is ready");
KafkaConsumer consumer1 = (KafkaConsumer) consumer;
var partitions = (Collection<TopicPartition>)consumer1.partitionsFor("").stream().collect(Collectors.toList());
consumer1.pause(partitions);
//reactiveKafkaConsumerTemplate.assignment().flatMap( o -> reactiveKafkaConsumerTemplate.pause((TopicPartition)o)).subscribe();
return null;
}).subscribe();
Exception: reactor.core.publisher.Operators","message":"Operator called default onErrorDropped","thrown":{"commonElementCount":0,"localizedMessage":"java.lang.IllegalStateException: You must call one of receive*() methods before using doOnConsumer","message":"java.lang.IllegalStateException: You must call one of receive*() methods before using doOnConsumer
Update 2 Tried with below as well:
reactiveKafkaReceiver.doOnConsumer(stringEquipmentMoveConsumer -> {
System.out.println("Consumer is ready");
stringEquipmentMoveConsumer.pause(stringEquipmentMoveConsumer.assignment().stream().collect(Collectors.toList()));
System.out.println("Consumer is ready 1");
return null;
}).subscribe();
No errors, but consumer didn't stop
I see you have that
reactiveKafkaReceiver
variable extracted. So, to perform pause on itsConsumer
you should use this API:See more info in
doOnConsumer
Javadocs: