I'm new in kafka. Now, i'm trying to commit kafka message explicitly like if my all mysql operation successful then i'll commit the record but kafka reads next offset if the previous offset has not been committed yet.

Codes are following:

`$conf = new \RdKafka\Conf();
$conf->set('group.id', $groupId);
$conf->set('metadata.broker.list', $brokers);
$conf->set('auto.offset.reset', 'earliest');
$conf->set('enable.auto.commit', 'true');
$conf->set('enable.auto.offset.store', 'false');
$conf->set('auto.commit.interval.ms', 100);
$conf->set('enable.partition.eof', 'true');

$consumer = new \RdKafka\KafkaConsumer($conf);
$consumer->subscribe($subscriptionArr);

$this->logger->info("Waiting for partition assignment... (make take some time when quickly re-joining the group after leaving it.)", [$groupId, $brokers, $subscriptionArr]);
$tmp=0;
//let add some counters so at least we have some numbers of topics we processed...
$search_array=[];
while ($active) {
    $message = $consumer->consume(120*1000);
    
    if ($this->debug) {
        $active = false;
    }
    
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            $topic_name = $message->topic_name;
            $timestamp = $message->timestamp;
            $payload = $message->payload;
            $message_offset = $message->offset;
            $partition = $message->partition;
            $timeoutMs = 10000000;
            
            $this->logger->info("topic partition details ", [$partition]);

            $topic = $consumer->newTopic($topic_name);
            $topicPartition = new \RdKafka\TopicPartition($topic_name, $partition);
            $this->logger->info("topic partitions",[$topicPartition]);
            $partition_id = $topicPartition->getPartition();
            $this->logger->info("topic partitions",[$partition_id]);
            $topicPartitionsWithOffsets = $consumer->getCommittedOffsets([$topicPartition], $timeoutMs);
            $queryExecutionStatus = $this->sinkConnector->injectKafka($topic_name, $payload, $timestamp);

            $this->logger->info($topic_name." query execution status",[$queryExecutionStatus]);
            $this->logger->info("before commited offset ", [$topicPartitionsWithOffsets]);
            if ($queryExecutionStatus == 1) {
                if (array_key_exists($topic_name, $search_array) ) {
                    $search_array[$topic_name] = $search_array[$topic_name] + 1;
                } else {
                    // set initial value of the counter
                    $search_array[$topic_name]=1;
                }
                $this->logger->info("manually commited offset of topic ".$topic_name, [$message_offset]);  

                $topic->offsetStore($message->partition, $message_offset);
                $storeOffset = $topicPartition->getOffset();
                $this->logger->info("commited offset ", [$storeOffset]);
            } else {
                $this->logger->warning("Warning: Message not inserted/updated ".$topic_name, [$message_offset]);
            }  
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            $this->logger->notice("No more messages on partition; will wait for more", [$search_array]);
            
            $tmp++;
            $search_array=[];
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            //on-timeouts.we.can.rest.the.cpu.of.the.machine.
            $this->logger->notice("Timed out", [$tmp]);
            
            $active = false;
            break;
        default:
            $this->logger->warning("Warning: Kafka-Exception", [message=>$message->errstr(), error=> $message->err, count=> $tmp]);
            //throw new \Exception($message->errstr(), $message->err);
            $active = false;
            break;
    }
}
// flush and close the kafka-client and make sure it does not leave any open files...
$this->logger->notice("Disconnecting from the nodes and go to sleep mode");
//$consumer->commit(); //we dont have a no local offset stored...;) this forcefully lowers: ls /proc/$pid/fd/ | wc -l
$consumer->unsubscribe();
$consumer->close();
//force-closure.to.release.fopens
unset($consumer);
$consumer = null;` 

I tried commit() of php rdkafka to commit it synchronously. Also tried to change value of auto.commit.interval.ms configuration

2

There are 2 answers

0
HiHello On BEST ANSWER

Since Kafka has a Pull-based model, meaning consumers have full control over the data consumption and brokers play the role of data storage. Brokers have no knowledge about consumers and how many records were consumed by an arbitrary consumer, let's say all parameters for consuming are provided by a consumer.

There is a definition of a last consumed offset (current offset) and last committed offset. The values of these offsets are different. The last consumed offset usually is greater than the last committed offset and simply represents the offset value of the latest record that was consumed (for more details you can check a short article: https://www.learningjournal.guru/courses/kafka/kafka-foundation-training/offset-management/). So the reason why your consumer keeps consume records is last consumed offset

As far as I know, to suspend a record consumption by an offset that is not committed you should "suspend" a consumer itself. In Java, there are pause and resume methods for that purpose, but your question is about PHP, so I did a little pick in Kafka docs for PHP but didn't find the explicit counterpart. Perhaps, you can check rd_kafka_pause_partitions() and rd_kafka_resume_partitions() (see https://idealo.github.io/php-rdkafka-ffi/api/RdKafka/FFI/Library/#rd_kafka_pause_partitions)

1
Mathew On

By defalut, kafka commits automatically, you should set enable.auto.commit to false.

set('enable.auto.commit', 'false');