Problem with kafka request v3+ serealization. Broker cant deserialize message

33 views Asked by At

Please help with kafka protocol. Don't known why request doesn't work. I think that problem in Compact version of strings. But don't known exact reason.

I send FindCoordinator v3 request to Kafka and receive error (from kafka blocker console):

2024-03-14T11:51:08+03:00 [2024-03-14 08:51:08,808] ERROR Exception while processing request from 10.89.0.45:9092-192.168.127.1:40345-26 (kafka.network.Processor)
2024-03-14T11:51:08+03:00 org.apache.kafka.common.errors.InvalidRequestException: Error parsing request header. Our best guess of the apiKey is: 10
2024-03-14T11:51:08+03:00 Caused by: java.nio.BufferUnderflowException
2024-03-14T11:51:08+03:00  at java.base/java.nio.HeapByteBuffer.get(Unknown Source)
2024-03-14T11:51:08+03:00  at java.base/java.nio.ByteBuffer.get(Unknown Source)
2024-03-14T11:51:08+03:00  at org.apache.kafka.common.protocol.ByteBufferAccessor.readArray(ByteBufferAccessor.java:58)
2024-03-14T11:51:08+03:00  at org.apache.kafka.common.protocol.Readable.readUnknownTaggedField(Readable.java:52)
2024-03-14T11:51:08+03:00  at org.apache.kafka.common.message.RequestHeaderData.read(RequestHeaderData.java:135)
2024-03-14T11:51:08+03:00  at org.apache.kafka.common.message.RequestHeaderData.<init>(RequestHeaderData.java:84)
2024-03-14T11:51:08+03:00  at org.apache.kafka.common.requests.RequestHeader.parse(RequestHeader.java:95)
2024-03-14T11:51:08+03:00  at kafka.network.Processor.parseRequestHeader(SocketServer.scala:999)
2024-03-14T11:51:08+03:00  at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:1012)
2024-03-14T11:51:08+03:00  at java.base/java.util.LinkedHashMap$LinkedValues.forEach(Unknown Source)
2024-03-14T11:51:08+03:00  at kafka.network.Processor.processCompletedReceives(SocketServer.scala:1008)
2024-03-14T11:51:08+03:00  at kafka.network.Processor.run(SocketServer.scala:893)
2024-03-14T11:51:08+03:00  at java.base/java.lang.Thread.run(Unknown Source)

Kafka version: 2.8.1

The binary format of request (total 29 bytes):

    //Message Size
    [0] = {byte} 0
    [1] = {byte} 0
    [2] = {byte} 0
    [3] = {byte} 25

    /// --- Header Start
    /// ApiKey = 10
    [4] = {byte} 0
    [5] = {byte} 10

    // Version = 3
    [6] = {byte} 0
    [7] = {byte} 3

    // CorrelationId = 0
    [8] = {byte} 0
    [9] = {byte} 0
    [10] = {byte} 0
    [11] = {byte} 0

    // 'clt' string as Nullable string
    // 'clt'.length as Int16
    [12] = {byte} 0
    [13] = {byte} 3 
    // 'clt' as utf-8
    [14] = {byte} 99
    [15] = {byte} 108
    [16] = {byte} 116

    /// --- FindCoordinator start
    // 'some_group' as CompactString
    [17] = {byte} 11  // length = 'some_group'.length + 1 as VarUInt
    [18] = {byte} 115
    [19] = {byte} 111
    [20] = {byte} 109
    [21] = {byte} 101
    [22] = {byte} 95
    [23] = {byte} 103
    [24] = {byte} 114
    [25] = {byte} 111
    [26] = {byte} 117
    [27] = {byte} 112
    // KeyType = 0 - group
    [28] = {byte} 0

All works with FindCoordinator v2 request (30 bytes total): //Message Size [0] = {byte} 0 [1] = {byte} 0 [2] = {byte} 0 [3] = {byte} 26

/// --- Header Start
/// ApiKey = 10
[4] = {byte} 0
[5] = {byte} 10

// Version = 2
[6] = {byte} 0
[7] = {byte} 2

// CorrelationId = 0
[8] = {byte} 0
[9] = {byte} 0
[10] = {byte} 0
[11] = {byte} 0

// 'clt' string as Nullable string
[12] = {byte} 0
[13] = {byte} 3
[14] = {byte} 99
[15] = {byte} 108
[16] = {byte} 116

/// --- FindCoordinator start
// 'some_group' as KafkaString
// size = 10 (2 bytes as Int16)
[17] = {byte} 0
[18] = {byte} 10
// utf-8 'some_group'
[19] = {byte} 115
[20] = {byte} 111
[21] = {byte} 109
[22] = {byte} 101
[23] = {byte} 95
[24] = {byte} 103
[25] = {byte} 114
[26] = {byte} 111
[27] = {byte} 117
[28] = {byte} 112
// KeyType = 0 - group
[29] = {byte} 0

Same issues with other requests with version greater or equal 3.

1

There are 1 answers

1
Геннадий Ефимов On

I found an answer. V3 requires taggedFiels (TAG_BUFFER) in RequestHeader and in FindCoorinator payload.

I thought tagged fields only supported from version 9. My bed. I should read the documentation more carefully.

//total message size = 28
// message size as Int32 = 24
[0] = {byte} 0
[1] = {byte} 0
[2] = {byte} 0
[3] = {byte} 24

/////------- Header v3 
// ApiKey=10 as Int16
[4] = {byte} 0
[5] = {byte} 10
// ApiVersion = 3 as Int16
[6] = {byte} 0
[7] = {byte} 3
// CorrelationId = 0 as Int32
[8] = {byte} 0
[9] = {byte} 0
[10] = {byte} 0
[11] = {byte} 0
// 'clt'.length as Int16
[12] = {byte} 0
[13] = {byte} 3 
// ClientId = 'clt' as Utf8 bytes
[14] = {byte} 99
[15] = {byte} 108
[16] = {byte} 116
//TaggedFiels - empty (len=0 as VarUint)
[17] = {byte} 0

/////------- FindCoordinator payload, v3
// 'some_group'.length as VarUInt
[18] = {byte} 11
// 'some_group' as utf8 bytes
[19] = {byte} 115
[20] = {byte} 111
[21] = {byte} 109
[22] = {byte} 101
[23] = {byte} 95
[24] = {byte} 103
[25] = {byte} 114
[26] = {byte} 111
[27] = {byte} 117
[28] = {byte} 112
//KeyType = 0 (group) as Int8
[29] = {byte} 0
//TaggedFiels - empty (len=0 as VarUint)
[30] = {byte} 0