We have configured two Kafka broker in application.YAML, one with SASL KERBEROS and the other one with SASL SCRAM. While starting the service it's connecting to broker with SASL KERBEROS and getting below error for other broker (SASL SCRAM). When we connect to one broker with SALS SCRAM in the application YAML it's connecting without any error
============================================================================================== Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE main] o.a.k.c.s.a.SaslClientAuthenticator Set SASL client state to SEND_HANDSHAKE_REQUEST main] o.a.k.c.s.a.SaslClientAuthenticator Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE main] o.a.k.c.s.a.SaslClientAuthenticator Set SASL client state to INITIAL main] o.apache.kafka.common.network.Selector Unexpected error from 100.76.140.194; closing connection
java.lang.NullPointerException: null at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:389) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:296) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:237)
Application.YAML
binders:
binder1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
replication-factor: 1
brokers: ${eventhub.broker.hosts2}
zkNodes: ${eventhub.zookeper.hosts2}
configuration:
security:
protocol: SASL_SSL
sasl:
mechanism: GSSAPI
ssl:
truststore:
location: ${eventhub.broker.cert.location2}
password: ${eventhub.broker.cert.password2}
jaas:
options:
useKeyTab: true
storeKey: true
keyTab: /scratch/kafka/kafka2/krb5.keytab
serviceName: kafka
principal: kafka/XXXXXXXXXXXXXXXX.COM
default:
consumer:
autoCommitOffset: false
binder2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ${eventhub.broker.hosts} # 10.40.158.93:9093
zkNodes: ${eventhub.zookeper.hosts} #10.40.158.93:2181
autoCreateTopics: false
zkConnectionTimeout: 36000
headers:
- event
- sourceSystem
- userId
- branchCode
- kafka_messageKey
jaas:
loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
options:
username: ${eventhub.broker.user}
password: ${eventhub.broker.password}
configuration:
security:
protocol: SASL_SSL
sasl:
mechanism: SCRAM-SHA-256
ssl:
enabled:
truststore:
location: ${eventhub.broker.cert.location}
password: ${eventhub.broker.cert.password}
Instead of relying on setting
JAAS
configuration through the binder or setting thejava.security.auth.login.config
property, when you have multiple clusters with different security contexts within a single application, you need to use the approaches mentioned on KIP-85. Basically, you need to setsasl.jaas.config
property which takes precedence over other methods. By usingsasl.jaas.config
, you can override the restrictions placed by JVM in which a JVM-wide static security context is used, thus ignoring any subsequent JAAS configurations found after the first one.Here is a sample application that demonstrates how to connect to multiple Kafka clusters with different security contexts as a multi-binder application.