Reactive Kafka threads not getting created

40 views Asked by At

Created Reactive consumer as below

ReceiverOptions.<String, String> options = create(consumerProperties)           .subscription(Collections.singleton("topic-name"));

KafkaReaciver<String, String> kafkaReaciver= kafkaReaciver.create(options);

kafkaReaciver.receive().
doOnNext(records -> {logger.info("Message Received");})
.publishOn(Schedulers.newParallel("customParallelThread"))
.subscribe(record -> {
logger.info("Message Processing started");
//functional logic
})

After execution for 1K records only 1 Kafka Consumer thread consuming all records and logging Message Received even if concurrency is set as 8 in consumer properties and only 1 customParallelThread gets created and logging Message Processing started and executing futher logic even if 8 core are available

Need inputs on this. Thanks in advance

0

There are 0 answers