Wanted to know if there is any way we can modify existing flux to receive messages at slower rate

41 views Asked by At

Trying to modify kafka flux which is listening on a kafka topic to accept messages at slow rate. But the blow code is giving exception.

public class SampleConsumer {

    private static final Logger log = LoggerFactory.getLogger(SampleConsumer.class.getName());

    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    boolean slow = false;
    private static final String TOPIC = "demo-topic";

    private final ReceiverOptions<Integer, String> receiverOptions;
    private final DateTimeFormatter dateFormat;

    public SampleConsumer(String bootstrapServers) {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-consumer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        receiverOptions = ReceiverOptions.create(props);
        dateFormat = DateTimeFormatter.ofPattern("HH:mm:ss:SSS z dd MMM yyyy");
    }

    public void consumeMessages(String topic, CountDownLatch latch) {

        ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topic))
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
        Flux<ReceiverRecord<Integer, String>> kafkaFlux = KafkaReceiver.create(options).receive();
        
        Disposable disposable = kafkaFlux.doOnNext(record -> System.out.println("Received message: value="+
                record.value()) )
        .subscribe(record -> {
            ReceiverOffset offset = record.receiverOffset();
            offset.acknowledge();
            
        });
        
        if(slow) // logic has been implemented for this
{
            disposable.dispose();
            
            disposable = kafkaFlux.take(1, true).doOnNext(record -> System.out.println("Received slow message: value="+
                    record.value()) )
            .subscribe(record -> {
                ReceiverOffset offset = record.receiverOffset();
                offset.acknowledge();
                
            });
        }
        
        
    }}

Exception: Caused by reactor.core.Exceptions$StaticRejectedExecutionException : Scheduler unavailable

Wanted to know if there is any way we can modify existing flux to receive messages at slower rate

0

There are 0 answers