Pause and resume a ReactiveKafkaReceiver based on condition

147 views Asked by At

I am trying to implement a circuit breaker with resilence4j for a kafka consumer that in turn calls a downstream API. Both the kafka consumer and API are reactive.

Requirement: There should be a Circuit Breaker in case the downstream API throws exception and the kafka consumer should pause if the circuit is open. The consumer should resume once the circuit goes back to closed state.

This is my consumer:

@EventListener(ApplicationStartedEvent.class) public void consume() {

    reactiveKafkaReceiver
            .doOnNext(record -> {
                log.info("Reading offset {}, message:{}", record.offset(),record.value());
            })
            .concatMap(r ->
                this.processRecord(r)
                        .doOnSuccess(event -> r.receiverOffset().acknowledge())
                        .doOnError(error -> log.error("Error processing consumer events: {} ", error.getMessage()))
                        .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true))
                        .onErrorResume(error -> {
                            log.error("Retries exhausted for " + r.value());
                            //deadLetterPublishingRecoverer.accept(r, new Exception(error));
                            //r.receiverOffset().acknowledge();
                            return Mono.empty();
                        }))
            .repeat()
            .subscribe();
}

I have been able to implement circuit breaker on the processRecord() method. Problem is, I am not able to pause and resume the kafka consumer based on circuit breaker events.

I have been able to do it in non reactive kafka using the KafkaListenerEndpointRegistry and the messageListenerContainers. I have hit a roadblock for reactive kafka. Any help would be greatly appreciated. Thanks!

I tried the below but got an exception:

reactiveKafkaConsumerTemplate
                                    .doOnConsumer( consumer -> {
                                        // Your code inside doOnConsumer
                                        System.out.println("Consumer is ready");
                                        KafkaConsumer consumer1 = (KafkaConsumer) consumer;
                                        var partitions = (Collection<TopicPartition>)consumer1.partitionsFor("").stream().collect(Collectors.toList());
                                        consumer1.pause(partitions);
                                        //reactiveKafkaConsumerTemplate.assignment().flatMap( o -> reactiveKafkaConsumerTemplate.pause((TopicPartition)o)).subscribe();
                                        return null;
                                    }).subscribe();

Exception: reactor.core.publisher.Operators","message":"Operator called default onErrorDropped","thrown":{"commonElementCount":0,"localizedMessage":"java.lang.IllegalStateException: You must call one of receive*() methods before using doOnConsumer","message":"java.lang.IllegalStateException: You must call one of receive*() methods before using doOnConsumer

Update 2 Tried with below as well:

reactiveKafkaReceiver.doOnConsumer(stringEquipmentMoveConsumer -> {
        System.out.println("Consumer is ready");
                                stringEquipmentMoveConsumer.pause(stringEquipmentMoveConsumer.assignment().stream().collect(Collectors.toList()));
                                System.out.println("Consumer is ready 1");
                                return null;
                            }).subscribe();

No errors, but consumer didn't stop

1

There are 1 answers

8
Artem Bilan On

I see you have that reactiveKafkaReceiver variable extracted. So, to perform pause on its Consumer you should use this API:

    receiver.doOnConsumer(consumer -> {
        consumer.pause(...);
        return null;
    })

See more info in doOnConsumer Javadocs:

/**
 * Invokes the specified function on the Kafka {@link Consumer} associated with this {@link KafkaReceiver}.
 * The function is scheduled when the returned {@link Mono} is subscribed to. The function is
 * executed on the thread used for other consumer operations to ensure that {@link Consumer}
 * is never accessed concurrently from multiple threads.
 * <p>
 * Example usage:
 * <pre>
 * {@code
 *     receiver.doOnConsumer(consumer -> consumer.partitionsFor(topic))
 *             .doOnSuccess(partitions -> System.out.println("Partitions " + partitions));
 * }
 * </pre>
 * Functions that are directly supported through the reactive {@link KafkaReceiver} interface
 * like <code>poll</code> and <code>commit</code> should not be invoked from <code>function</code>.
 * The methods supported by <code>doOnConsumer</code> are:
 * <ul>
 *   <li>{@link Consumer#assignment()}
 *   <li>{@link Consumer#subscription()}
 *   <li>{@link Consumer#seek(org.apache.kafka.common.TopicPartition, long)}
 *   <li>{@link Consumer#seekToBeginning(java.util.Collection)}
 *   <li>{@link Consumer#seekToEnd(java.util.Collection)}
 *   <li>{@link Consumer#position(org.apache.kafka.common.TopicPartition)}
 *   <li>{@link Consumer#committed(org.apache.kafka.common.TopicPartition)}
 *   <li>{@link Consumer#metrics()}
 *   <li>{@link Consumer#partitionsFor(String)}
 *   <li>{@link Consumer#listTopics()}
 *   <li>{@link Consumer#paused()}
 *   <li>{@link Consumer#pause(java.util.Collection)}
 *   <li>{@link Consumer#resume(java.util.Collection)}
 *   <li>{@link Consumer#offsetsForTimes(java.util.Map)}
 *   <li>{@link Consumer#beginningOffsets(java.util.Collection)}
 *   <li>{@link Consumer#endOffsets(java.util.Collection)}
 * </ul>
 *
 * @param function A function that takes Kafka {@link Consumer} as parameter
 * @return Mono that completes with the value returned by <code>function</code>
 */
<T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function);