Micronaut-Kafka with auto.commit.enable=false: how commit manually an offset

1.7k views Asked by At

I want to commit a message only if it was saved succesfully in database.

I understand I have turned off auto-commit with this application.yml

micronaut:
  application:
    name: demoGrpcKafka
  executors:
    consumer:
      type: fixed
      nThreads: 1
#kafka.bootstrap.servers: localhost:9092
kafka:
  bootstrap:
    servers: localhost:9092
  consumers:
    default:
      auto:
        commit:
          enable: false
  producers:
    #default:
    demo-producer:
      retries: 2

Consumer

package com.tolearn.consumer

import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic

@KafkaListener(groupId="myGroup")
class DemoConsumer {
    @Topic("testkey")
    fun receive(@KafkaKey key: String?,
                msg: String,
                offset: Long,
                partition: Int,
                topic: String,
                timestamp: Long
    ){
        println("Key = $key " +
                "msg = $msg " +
                "offset = $offset " +
                "partition = $partition " +
                "topic = $topic " +
                "timestamp = $timestamp ")

        //saved to database
        
        // THE ISSUE IS HERE: how commit like consumer.commitOffsets(true) ?????
    }
}

In other words, how I either commitOffset or commitSync() or any other alternative to commit the message manually while using Micronaut-Kafka?

*** second edition

I returned to application.yaml

  consumers:
    default:
      auto:
        commit:
          enable: false

*** third edit

I tried add either io.micronaut.configuration.kafka.Acknowledgement (deprecated) or import io.micronaut.messaging.Acknowledgement and either one caused

com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

It seems I have to do something else in order to Micronaut inject such Acknowledgement object. What I am missing bellow?

package com.tolearn.consumer

import io.micronaut.configuration.kafka.Acknowledgement
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
//import io.micronaut.messaging.Acknowledgement
//import io.micronaut.messaging.annotation.Header

@KafkaListener(
        groupId="myGroup",
        offsetStrategy=OffsetStrategy.SYNC_PER_RECORD
)
class DemoConsumer {
    @Topic("demotopic")
    fun receive(@KafkaKey key: String?,
                acknowledgement: Acknowledgement,
                msg: String,
                offset: Long,
                partition: Int,
                topic: String,
                timestamp: Long
                //,header: Header
    ){
        println("Key = $key " +
                "msg = $msg " +
                "offset = $offset " +
                "partition = $partition " +
                "topic = $topic " +
                "timestamp = $timestamp "
                // + "header = $header"
        )

        //saved to database

        // how commit like consumer.commitOffsets(true)
        //Consumer.commitSync()

        acknowledgement.ack();
    }
}

The whole log is

18:13:13.812 [consumer-executor-thread-1] ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Kafka consumer [com.tolearn.consumer.DemoConsumer@17e970dd] failed to deserialize value: Error deserializing key/value for partition demotopic-0 at offset 4. If needed, please seek past the record to continue consumption.
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition demotopic-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: io.micronaut.core.serialize.exceptions.SerializationException: Error deserializing object from JSON: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"name: "Hello"
"; line: 1, column: 6]
    at io.micronaut.jackson.serialize.JacksonObjectSerializer.deserialize(JacksonObjectSerializer.java:73)
    at io.micronaut.configuration.kafka.serde.JsonSerde.deserialize(JsonSerde.java:82)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
    at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$process$8(KafkaConsumerProcessor.java:396)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"name: "Hello"
"; line: 1, column: 6]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:717)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3588)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3564)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken2(UTF8StreamJsonParser.java:2899)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchNull(UTF8StreamJsonParser.java:2870)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:844)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:757)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4664)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4513)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3529)
    at io.micronaut.jackson.serialize.JacksonObjectSerializer.deserialize(JacksonObjectSerializer.java:71)
    ... 18 common frames omitted
18:13:13.812 [consumer-executor-thread-1] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=demo-grpc-kafka-demo-consumer, groupId=myGroup] Seeking to offset 5 for partition demotopic-0
1

There are 1 answers

8
Michael Heil On BEST ANSWER

According to the documentation you can set the offsetStrategy in your KafkaListener annotation like

@KafkaListener(groupId="myGroup", offsetStrategy=OffsetStrategy.SYNC)
class DemoConsumer {
    @Topic("testkey")
    fun receive(@KafkaKey key: String?,
[...]

to one of the following options:

ASYNC: Asynchronously commit offsets using Consumer.commitAsync() after each batch of messages is processed.
ASYNC_PER_RECORD: Asynchronously commit offsets using Consumer.commitSync() after each ConsumerRecord is consumed.
AUTO: Automatically commit offsets with the Consumer.poll(long) loop.
DISABLED: Do not commit offsets.
SYNC: Synchronously commit offsets using Consumer.commitSync() after each batch of messages is processed.
SYNC_PER_RECORD: Synchronously commit offsets using Consumer.commitSync() after each ConsumerRecord is consumed.

If I understand your question correctly, you want to set it to SYNC.