NullPointerException exception while connecting to Kafka broker with SASL/SCRAM

1.2k views Asked by At

We have configured two Kafka broker in application.YAML, one with SASL KERBEROS and the other one with SASL SCRAM. While starting the service it's connecting to broker with SASL KERBEROS and getting below error for other broker (SASL SCRAM). When we connect to one broker with SALS SCRAM in the application YAML it's connecting without any error

============================================================================================== Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE main] o.a.k.c.s.a.SaslClientAuthenticator Set SASL client state to SEND_HANDSHAKE_REQUEST main] o.a.k.c.s.a.SaslClientAuthenticator Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE main] o.a.k.c.s.a.SaslClientAuthenticator Set SASL client state to INITIAL main] o.apache.kafka.common.network.Selector Unexpected error from 100.76.140.194; closing connection

java.lang.NullPointerException: null at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:389) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:296) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:237)

Application.YAML

  binders:
   binder1:
    type: kafka
    environment:
     spring:
      cloud:
       stream:
        kafka:
         binder:
          replication-factor: 1
          brokers: ${eventhub.broker.hosts2}
          zkNodes: ${eventhub.zookeper.hosts2}
          configuration:
            security:
              protocol: SASL_SSL
            sasl:
              mechanism: GSSAPI
            ssl:
              truststore:
                location: ${eventhub.broker.cert.location2}
                password: ${eventhub.broker.cert.password2}

          jaas:
           options:
            useKeyTab: true
            storeKey: true
            keyTab: /scratch/kafka/kafka2/krb5.keytab
            serviceName: kafka
            principal: kafka/XXXXXXXXXXXXXXXX.COM
         default:
          consumer:
           autoCommitOffset: false

   binder2:
    type: kafka
    environment:
     spring:
      cloud:
       stream:
        kafka:
         binder:
          brokers: ${eventhub.broker.hosts} # 10.40.158.93:9093
          zkNodes: ${eventhub.zookeper.hosts} #10.40.158.93:2181
          autoCreateTopics: false
          zkConnectionTimeout: 36000
          headers: 
           - event
           - sourceSystem
           - userId
           - branchCode
           - kafka_messageKey
          jaas:
            loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
            options:
              username: ${eventhub.broker.user}
              password: ${eventhub.broker.password}
          configuration:
            security:
              protocol: SASL_SSL
            sasl:
              mechanism: SCRAM-SHA-256
            ssl:
              enabled:
              truststore:
                location: ${eventhub.broker.cert.location}
                password: ${eventhub.broker.cert.password}
1

There are 1 answers

0
sobychacko On

Instead of relying on setting JAAS configuration through the binder or setting the java.security.auth.login.config property, when you have multiple clusters with different security contexts within a single application, you need to use the approaches mentioned on KIP-85. Basically, you need to set sasl.jaas.config property which takes precedence over other methods. By using sasl.jaas.config, you can override the restrictions placed by JVM in which a JVM-wide static security context is used, thus ignoring any subsequent JAAS configurations found after the first one.

Here is a sample application that demonstrates how to connect to multiple Kafka clusters with different security contexts as a multi-binder application.