JAAS configuration for Managed Identity for Kafka-Connect in Azure

21 views Asked by At

Am usig kafka connect to fetch data from evenhub to snowflakes.I need to use Managed identity in Azure to connect evenhub.It fails

Am using below config in json

security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
request.timeout.ms=60000
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/xxxxxxxx/oauth2/v2.0/authorize
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId='xxxxxxxxx' clientSecret='xxxxxxxx';

Managed identity has data owner role in evenhub Below command used to start connect sh connect-distributed.sh ../config/connect-distributed.properties

Am geeting below error

ERROR Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false') at [Source: (StringReader); line: 3, column: 2] (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule:318) com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false') at [Source: (StringReader); line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:735) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:659) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:2005) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:802) at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4703) at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3076) at org.apache.kafka.common.security.oauthbearer.secured.HttpAccessTokenRetriever.parseAccessToken(HttpAccessTokenRetriever.java:286) at org.apache.kafka.common.security.oauthbearer.secured.HttpAccessTokenRetriever.retrieve(HttpAccessTokenRetriever.java:183) at org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.handleTokenCallback(OAuthBearerLoginCallbackHandler.java:244) at org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.handle(OAuthBearerLoginCallbackHandler.java:233) at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316) at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:747) at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:672) at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:670) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:670) at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:581) at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204) at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150) at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:62) at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:524) at org.apache.kafka.clients.admin.Admin.create(Admin.java:144) at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:57) at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:97) at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80) ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:86) org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:553) at org.apache.kafka.clients.admin.Admin.create(Admin.java:144) at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:57) at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:97) at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80) Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:524) ... 4 more Caused by: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319) at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:747) at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:672) at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:670) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:670) at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:581) at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204) at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150) at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:62) at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170) ... 8 more

0

There are 0 answers