Reactor Kafka is causing the interruption of consumer messages from random partitions

55 views Asked by At

Seems like a persistent issue with my reactive Kafka consumer—it intermittently halts message consumption from different partitions.

A restart temporarily fixes the problem, but it resurfaces after some time. Despite experimenting with various Kafka consumer properties, the root cause remains elusive.

Any help or insights would be highly appreciated.

    private val kafkaGlobalConfigProperties = kafkaGlobalConfigProperties()

    fun initialize(
        consumerTopic: String,
        consumerGroup: String,
        consumerAutoOffset: String,
        maxPollSize: Int,
        maxPollIntervalMs: Int,
        concurrency: Int,
        maxDeferredCommits: Int,
        commitBatchSize: Int,
        bufferMaxSize: Int,
        commitInterval: Long,
        maxPartitionFetchBytes: Int? = 1 * 1024 * 1024,
        operation: suspend (record: ReceiverRecord<String, ByteArray>) -> Unit
    ) {
        val kafkaReceiver = kafkaReceiver(
            consumerGroup = "test-consumer-group",
            consumerAutoOffset = "latest",
            maxPollSize = 1000,
            maxPollIntervalMs = 300000,
            consumerTopic = "test-topic",
            commitBatchSize = 2000,
            commitInterval = 0,
            maxDeferredCommits = 3000,
            maxPartitionFetchBytes = 1 * 1024 * 1024
        )

        kafkaReceiver.receive()
            .flatMap({
                mono {
                    executeWith(operation, it)
                }.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)))
                    .onErrorResume { ex ->
                        it.receiverOffset().acknowledge()
                        log.error("Error in processing record: ${ex.message}", ex)
                        Mono.empty()

                    }
            }, concurrency)
            .retryWhen(
                Retry.indefinitely()
            )
            .doOnError { error ->
                log.error(error) { "received error in reactive stream pipeline $error" }
            }
            .buffer(bufferMaxSize)
            .repeat()
            .subscribe()
    }

    private suspend fun executeWith(
        operation: suspend (record: ReceiverRecord<String, ByteArray>) -> Unit,
        it: ReceiverRecord<String, ByteArray>
    ) {
        try {
            operation(it)
            it.receiverOffset().acknowledge()
        } catch (ex: TimeoutCancellationException) {
            log.warn("Record processing timed out: ${ex.message}")
            it.receiverOffset().acknowledge()
        }
    }

    private fun kafkaReceiver(
        consumerGroup: String,
        consumerAutoOffset: String,
        maxPollSize: Int,
        maxPollIntervalMs: Int,
        consumerTopic: String,
        commitBatchSize: Int,
        commitInterval: Long,
        maxDeferredCommits: Int,
        maxPartitionFetchBytes: Int?
    ): KafkaReceiver<String, ByteArray> {
        val consumerProperties = consumerProperties(
            consumerGroup, consumerAutoOffset,
            maxPollSize, maxPollIntervalMs, maxPartitionFetchBytes
        )
        val receiverOptions = ReceiverOptions.create<String, ByteArray>(consumerProperties)
            .subscription(setOf(consumerTopic))
            .commitBatchSize(commitBatchSize)
            .commitInterval(Duration.ofMillis(commitInterval))
            .maxDeferredCommits(maxDeferredCommits)
            .addAssignListener { partitions ->
                partitions.forEach { partition ->
                    log.info(
                        "Assigned partition: ${partition.topicPartition().partition()} " +
                            "on topic ${partition.topicPartition().topic()}"
                    )
                }
            }
            .addRevokeListener { partitions ->

                partitions.forEach { partition ->
                    log.info(
                        "Revoked partition: ${partition.topicPartition().partition()} " +
                            "on topic ${partition.topicPartition().topic()}"
                    )
                }

            }

        return KafkaReceiver.create(receiverOptions)
    }

    private fun consumerProperties(
        consumerGroup: String,
        consumerAutoOffset: String,
        maxPollSize: Int,
        maxPollIntervalMs: Int,
        maxPartitionFetchBytes: Int?
    ): Properties {
        val consumerProperties = Properties().apply {
            put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaGlobalConfigProperties.bootstrapServer)
            put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
            put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
            put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
            put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffset)
            put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollSize)
            put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs)
            put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes)
            put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
        }
        consumerProperties.putAll(kafkaGlobalConfigProperties.credentialsConfig())
        return consumerProperties
    }
}



0

There are 0 answers