Am usig kafka connect to fetch data from evenhub to snowflakes.I need to use Managed identity in Azure to connect evenhub.It fails
Am using below config in json
security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
request.timeout.ms=60000
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/xxxxxxxx/oauth2/v2.0/authorize
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId='xxxxxxxxx' clientSecret='xxxxxxxx';
Managed identity has data owner role in evenhub Below command used to start connect sh connect-distributed.sh ../config/connect-distributed.properties
Am geeting below error
ERROR Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false') at [Source: (StringReader); line: 3, column: 2] (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule:318) com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false') at [Source: (StringReader); line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:735) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:659) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:2005) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:802) at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4703) at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3076) at org.apache.kafka.common.security.oauthbearer.secured.HttpAccessTokenRetriever.parseAccessToken(HttpAccessTokenRetriever.java:286) at org.apache.kafka.common.security.oauthbearer.secured.HttpAccessTokenRetriever.retrieve(HttpAccessTokenRetriever.java:183) at org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.handleTokenCallback(OAuthBearerLoginCallbackHandler.java:244) at org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.handle(OAuthBearerLoginCallbackHandler.java:233) at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316) at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:747) at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:672) at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:670) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:670) at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:581) at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204) at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150) at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:62) at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:524) at org.apache.kafka.clients.admin.Admin.create(Admin.java:144) at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:57) at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:97) at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80) ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:86) org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:553) at org.apache.kafka.clients.admin.Admin.create(Admin.java:144) at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:57) at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:97) at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80) Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:524) ... 4 more Caused by: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319) at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:747) at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:672) at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:670) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:670) at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:581) at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204) at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150) at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:62) at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170) ... 8 more