In a Kafka Stream Application, I am encountering intermittent SaslAuthenticationException issues. I am using the below configurations.Using the SASL_SSL protocal for security
spring.kafka.properties.security.protocol=SASL_SSL spring.kafka.properties.sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com//oauth2/token spring.kafka.properties.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler spring.kafka.properties.sasl.mechanism=OAUTHBEARER spring.kafka.properties.sasl.jaas.config=
Using the following JAAS Config:
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="" clientSecret="" unsecuredLoginStringClaim_sub="admin" scope="PatrickOauth" extension_logicalCluster="lkc-5x8xxx" extension_identityPoolId="pool-P5xx";
{"ts": "2024-03-21 18:44:57,860", "level": "ERROR", "source": "org.apache.kafka.clients.NetworkClient:764","message":[Producer clientId=producer-3] Connection to node 1 (b1-xxx-xxxxxx.eu-west-1.aws.confluent.cloud/xx.xx.xx.207:9092) failed authentication due to: Authentication failed: 1 extensions are invalid! They are: logicalCluster: CLUSTER_ID_MISSING_OR_EMPTY}
I have enabled debug logs to investigate the issue further. Upon checking the logs, it seems that the error occurs while fetching metadata:
{"ts": "2024-03-21 18:37:43,594", "level": "DEBUG", "source": "org.apache.kafka.clients.consumer.internals.Fetcher:274","message":[Consumer clientId=consumer-int.group-6, groupId=int.group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), implied=(test.topic1-16, test.topic1-10, test.topic1-4, test.topic1-22), canUseTopicIds=True) to broker b1-pkc-z56v0.eu-west-1.aws.confluent.cloud:9092 (id: 1 rack: euw1-az2)} {"ts": "2024-03-21 18:37:43,594", "level": "DEBUG", "source": "org.apache.kafka.clients.NetworkClient:521","message":[Consumer clientId=consumer-int.group-6, groupId=int.group] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-int.group-6, correlationId=37685) and timeout 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=100000, maxBytes=52428800, isolationLevel=0, sessionId=1487743369, sessionEpoch=6102, topics=[], forgottenTopicsData=[], rackId='')}
Also I get frequent logs for Kafka-streams health check failure
2024-03-21 13:19:52.259 WARN 1 --- [nio-8080-exec-3] .b.k.s.KafkaStreamsBinderHealthIndicator : Kafka-streams health check failed
And Below errors
{"ts": "2024-03-21 18:38:39,437", "level": "DEBUG", "source": "o.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics:163","message":Cannot generate metric for topic: int.push.sea.sobject.create} java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
ConcurrentModificationException occurs sometimes before the SaslAuthenticationException bot sure if they can be related. Also why i am getting the SaslAuthenticationException. Any idea on how to help with this ?
currentLeader=LeaderAndEpoch{leader=Optional[b1-pkc-xxxxx.eu.west-1.aws.confluent.cloud:9092 (id: 1 rack: euw1-az2)], epoch=26}} to node b1-pkc-xxxxx.eu.west-1.aws.confluent.cloud:9092 (id: 1 rack: euw1-az2)} {"ts": "2024-03-21 18:37:43,594", "level": "DEBUG", "source": "org.apache.kafka.clients.consumer.internals.Fetcher:1245","message":[Consumer clientId=consumer-stg.grp.test-sink-6, groupId=stg.grp.test-sink] Added READ_UNCOMMITTED fetch request for partition stg.push.test.upsert-22 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b1-pkc-xxxxx.eu.west-1.aws.confluent.cloud:9092 (id: 1 rack: euw1-az2)], epoch=26}} to node b1-pkc-xxxxx.eu.west-1.aws.confluent.cloud:9092 (id: 1 rack: euw1-az2)} {"ts": "2024-03-21 18:37:43,594", "level": "DEBUG", "source": "org.apache.kafka.clients.FetchSessionHandler:351","message":[Consumer clientId=consumer-stg.grp.test-sink-6, groupId=stg.grp.test-sink] Built incremental fetch (sessionId=1487743369, epoch=6102) for node 1. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 4 partition(s)} {"ts": "2024-03-21 18:37:43,594", "level": "DEBUG", "source": "org.apache.kafka.clients.consumer.internals.Fetcher:274","message":[Consumer clientId=consumer-stg.grp.test-sink-6, groupId=stg.grp.test-sink] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), implied=(stg.push.test.upsert-16, stg.push.test.upsert-10, stg.push.test.upsert-4, stg.push.test.upsert-22), canUseTopicIds=True) to broker b1-pkc-xxxxx.eu.west-1.aws.confluent.cloud:9092 (id: 1 rack: euw1-az2)} {"ts": "2024-03-21 18:37:43,594", "level": "DEBUG", "source": "org.apache.kafka.clients.NetworkClient:521","message":[Consumer clientId=consumer-stg.grp.test-sink-6, groupId=stg.grp.test-sink] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-stg.grp.test-sink-6, correlationId=37685) and timeout 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=100000, maxBytes=52428800, isolationLevel=0, sessionId=1487743369, sessionEpoch=6102, topics=[], forgottenTopicsData=[], rackId='')} {"ts": "2024-03-21 18:37:43,600", "level": "DEBUG", "source": "o.a.kafka.common.security.authenticator.SaslClientAuthenticator:411","message":[Consumer clientId=consumer-stg.grp.test-sink-7, groupId=stg.grp.test-sink] Set SASL client state to FAILED} {"ts": "2024-03-21 18:37:43,601", "level": "INFO", "source": "org.apache.kafka.common.network.Selector:616","message":[Consumer clientId=consumer-stg.grp.test-sink-7, groupId=stg.grp.test-sink] Failed re-authentication with b5-pkc-xxxxx.eu.west-1.aws.confluent.cloud/52.211.15.1 (Authentication failed: 1 extensions are invalid! They are: logicalCluster: CLUSTER_ID_MISSING_OR_EMPTY)} {"ts": "2024-03-21 18:37:43,601", "level": "INFO", "source": "org.apache.kafka.clients.NetworkClient:935","message":[Consumer clientId=consumer-stg.grp.test-sink-7, groupId=stg.grp.test-sink] Node 5 disconnected.} {"ts": "2024-03-21 18:37:43,601", "level": "ERROR", "source": "org.apache.kafka.clients.NetworkClient:764","message":[Consumer clientId=consumer-stg.grp.test-sink-7, groupId=stg.grp.test-sink] Connection to node 5 (b5-pkc-xxxxx.eu.west-1.aws.confluent.cloud/52.211.15.1:9092) failed authentication due to: Authentication failed: 1 extensions are invalid! They are: logicalCluster: CLUSTER_ID_MISSING_OR_EMPTY} {"ts": "2024-03-21 18:37:43,601", "level": "DEBUG", "source": "org.apache.kafka.clients.NetworkClient:335","message":[Consumer clientId=consumer-stg.grp.test-sink-7, groupId=stg.grp.test-sink] Cancelled in-flight LIST_OFFSETS request with correlation id 2249 due to node 5 being disconnected (elapsed time since creation: 5008ms, elapsed time since send: 5008ms, request timeout: 30000ms): ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='stg.push.test.upsert', partitions=[ListOffsetsPartition(partitionIndex=14, currentLeaderEpoch=26, timestamp=-1, maxNumOffsets=1), ListOffsetsPartition(partitionIndex=8, currentLeaderEpoch=26, timestamp=-1, maxNumOffsets=1), ListOffsetsPartition(partitionIndex=2, currentLeaderEpoch=26, timestamp=-1, maxNumOffsets=1), ListOffsetsPartition(partitionIndex=20, currentLeaderEpoch=26, timestamp=-1, maxNumOffsets=1)])])} {"ts": "2024-03-21 18:37:43,602", "level": "DEBUG", "source": "o.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:596","message":[Consumer clientId=consumer-stg.grp.test-sink-7, groupId=stg.grp.test-sink] Cancelled request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=7, clientId=consumer-stg.grp.test-sink-7, correlationId=2249) due to node 5 being disconnected} {"ts": "2024-03-21 18:37:43,602", "level": "DEBUG", "source": "o.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics:163","message":Cannot generate metric for topic: stg.push.test.upsert} org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: 1 extensions are invalid! They are: logicalCluster: CLUSTER_ID_MISSING_OR_EMPTY