Trying to modify kafka flux which is listening on a kafka topic to accept messages at slow rate. But the blow code is giving exception.
public class SampleConsumer {
private static final Logger log = LoggerFactory.getLogger(SampleConsumer.class.getName());
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
boolean slow = false;
private static final String TOPIC = "demo-topic";
private final ReceiverOptions<Integer, String> receiverOptions;
private final DateTimeFormatter dateFormat;
public SampleConsumer(String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-consumer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
receiverOptions = ReceiverOptions.create(props);
dateFormat = DateTimeFormatter.ofPattern("HH:mm:ss:SSS z dd MMM yyyy");
}
public void consumeMessages(String topic, CountDownLatch latch) {
ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topic))
.addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
Flux<ReceiverRecord<Integer, String>> kafkaFlux = KafkaReceiver.create(options).receive();
Disposable disposable = kafkaFlux.doOnNext(record -> System.out.println("Received message: value="+
record.value()) )
.subscribe(record -> {
ReceiverOffset offset = record.receiverOffset();
offset.acknowledge();
});
if(slow) // logic has been implemented for this
{
disposable.dispose();
disposable = kafkaFlux.take(1, true).doOnNext(record -> System.out.println("Received slow message: value="+
record.value()) )
.subscribe(record -> {
ReceiverOffset offset = record.receiverOffset();
offset.acknowledge();
});
}
}}
Exception: Caused by reactor.core.Exceptions$StaticRejectedExecutionException : Scheduler unavailable
Wanted to know if there is any way we can modify existing flux to receive messages at slower rate