Reactive program exiting early before sending all messages to Kafka

386 views Asked by At

This is a subsequent question to a previous reactive kafka issue (Issue while sending the Flux of data to the reactive kafka).

I am trying to send some log records to the kafka using the reactive approach. Here is the reactive code sending messages using reactive kafka.

public class LogProducer {

    private final KafkaSender<String, String> sender;

    public LogProducer(String bootstrapServers) {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        SenderOptions<String, String> senderOptions = SenderOptions.create(props);

        sender = KafkaSender.create(senderOptions);
    }

    public void sendMessages(String topic, Flux<Logs.Data> records) throws InterruptedException {
    
        AtomicInteger sentCount = new AtomicInteger(0);
        sender.send(records
        .map(record -> {
            LogRecord lrec = record.getRecords().get(0);
            String id = lrec.getId();
            Thread.sleep(0, 5); // sleep for 5 ns
            return SenderRecord.create(new ProducerRecord<>(topic, id,
                    lrec.toString()), id);
        })).doOnNext(res -> sentCount.incrementAndGet()).then()
        .doOnError(e -> {
            log.error("[FAIL]: Send to the topic: '{}' failed. "
                    + e, topic);
        })
        .doOnSuccess(s -> {
            log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
        })
        .subscribe();
    }

}


public class ExecuteQuery implements Runnable {

    private LogProducer producer = new LogProducer("localhost:9092");

    @Override
    public void run() {
        Flux<Logs.Data> records = ...
        producer.sendMessages(kafkaTopic, records);
        .....
        .....
        // processing related to the messages sent
    }

}

So even when the Thread.sleep(0, 5); is there, sometimes it does not send all messages to kafka and the program exists early printing the SUCCESS message (log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);). Is there any more concrete way to solve this problem. For example, using some kind of callback, so that thread will wait for all messages to be sent successfully.

I have a spring console application and running ExecuteQuery through a scheduler at fixed rate, something like this

public class Main {

private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE);

public static void main(String[] args) {
     QueryScheduler scheduledQuery = new QueryScheduler();
     scheduler.scheduleAtFixedRate(scheduledQuery, 0, 5, TimeUnit.MINUTES);
}

class QueryScheduler implements Runnable {

  @Override
  public void run() {
      // preprocessing related to time
      executor.execute(new ExecuteQuery());
      // postprocessing related to time
  }

}
}
1

There are 1 answers

2
Artem Bilan On

Your Thread.sleep(0, 5); // sleep for 5 ns does not have any value for a main thread to be blocked, so it exits when it needs and your ExecuteQuery may not finish its job yet.

It is not clear how you start your application, but I recommended Thread.sleep() exactly in a main thread to block. To be precise in the public static void main(String[] args) { method impl.