Commit message Individually or Batchwise using Sarama - kafka client for Go

3.5k views Asked by At

I was trying to make kafka consumer which would collect messages for a particular amount of time after that I could manually commit with the messages that has been collected. But I could not find a method or api from shopify sarama that could be used to commit a message or a batch of messages , Please help out

2

There are 2 answers

0
ppatierno On

With autocommit you don't have full control on when it's happening anyway. It's periodic and happens behind the scenes for you. If it's not ok for you you can also use the ConsumerGroupSession.MarkOffset(topic string, partition int32, offset int64, metadata string) for committing whenever you want (so even after a specific amount of time) an offset as the last of a batch of consumed messages.

0
Matteo On

You could use the Interval params of the AutoCommit field of the Offsets Config:

// Offsets specifies configuration for how and when to commit consumed
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
    // Deprecated: CommitInterval exists for historical compatibility
    // and should not be used. Please use Consumer.Offsets.AutoCommit
    CommitInterval time.Duration

    // AutoCommit specifies configuration for commit messages automatically.
    AutoCommit struct {
        // Whether or not to auto-commit updated offsets back to the broker.
        // (default enabled).
        Enable bool

        // How frequently to commit updated offsets. Ineffective unless
        // auto-commit is enabled (default 1s)
        Interval time.Duration
    }

As example:

// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
...

// Autocommit after two minutes
config.Consumer.Offsets.AutoCommit.Interval = 2 * time.Minute