Getting SSL Exception While Connecting Kafka Cluster with Spring boot and Apache Camel

1k views Asked by At

My requirement is to connect the Kafka topic through the SSL with Spring Boot and Apache Camel, for that, I have written the below code but I'm facing an error like Caused by sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target this

anyone, please help me how to resolve this error.

//in this code i'm configured the SSL
    @Configuration
    public class Testing {
        @Bean
         SSLContextParameters sslContextParameters(){
            KeyStoreParameters store = new KeyStoreParameters();
            store.setResource("kafka.client.truststore.jks");
            store.setPassword("123456");
    
            TrustManagersParameters trust = new TrustManagersParameters();
            trust.setKeyStore(store);
    
            SSLContextParameters parameters = new SSLContextParameters();
            parameters.setTrustManagers(trust);
    
            return parameters;
          }
    
    }

In the below file, I'm calling router with sslContextParameters parameter

    @Autowired
    SSLContextParameters params;
@Override
    public void configure() throws Exception {
    from("{{timerOnce}}").process(consumerCreate).to(
                "https://xx.xx.xx.xxx/consumers/group-id?sslContextParameters=params");

}

****** I've Used Another Approach for Connecting Kafka Cluster through SSL but no luck it's getting exception like this ****** org.apache.camel.spring.boot.CamelSpringBootInitializationException: java.io.IOException: Invalid Keystore format

Below code, I enabled SSL

public Endpoint setupSSLConext(CamelContext camelContext) throws Exception {

        KeyStoreParameters keyStoreParameters = new KeyStoreParameters();
        // Change this path to point to your truststore/keystore as jks files
        keyStoreParameters.setResource("kafka.client.truststore.jks");
        keyStoreParameters.setPassword("123456");

        KeyManagersParameters keyManagersParameters = new KeyManagersParameters();
        keyManagersParameters.setKeyStore(keyStoreParameters);
        keyManagersParameters.setKeyPassword("123456");

        TrustManagersParameters trustManagersParameters = new TrustManagersParameters();
        trustManagersParameters.setKeyStore(keyStoreParameters);

        SSLContextParameters sslContextParameters = new SSLContextParameters();
        sslContextParameters.setKeyManagers(keyManagersParameters);
        sslContextParameters.setTrustManagers(trustManagersParameters);

        HttpComponent httpComponent = camelContext.getComponent("https4", HttpComponent.class);
        httpComponent.setSslContextParameters(sslContextParameters);


        // This is important to make your cert skip CN/Hostname checks
        httpComponent.setX509HostnameVerifier(new X509HostnameVerifier() {
            @Override
            public void verify(String s, SSLSocket sslSocket) throws IOException {

            }

            @Override
            public void verify(String s, X509Certificate x509Certificate) throws SSLException {

            }

            @Override
            public void verify(String s, String[] strings, String[] strings1) throws SSLException {

            }

            @Override
            public boolean verify(String s, SSLSession sslSession) {
                // I don't mind just return true for all or you can add your own logic
                return true;
            }

        });

        return     httpComponent.createEndpoint("https://XX.XX.X.XXX/consumers/");
    }

Below code in the router I used ENDPOINT

    public void configure() throws Exception {

        Endpoint createEndpoint = cdcHelper.setupSSLConext(context);

        from("{{timerOnce}}").process(consumerCreate)
                .to(createEndpoint);    // calling kafka consumer 

    }
}
1

There are 1 answers

0
PraveenKumar K G On

You can follow below approach to set up a Kafka consumer using Apache Camel & SpringBoot.

add the below properties to your application.properties

# kafka configuration
kafka.topic=iot1
kafka.camelKafkaOptions.groupId=grp1
kafka.camelKafkaOptions.brokers=kafka.localtest:9093
kafka.camelKafkaOptions.consumersCount=10
kafka.camelKafkaOptions.autoOffsetReset=latest
kafka.camelKafkaOptions.autoCommitEnable=false
kafka.camelKafkaOptions.allowManualCommit=true
kafka.camelKafkaOptions.metadataMaxAgeMs=5000
kafka.camelKafkaOptions.securityProtocol=SSL
kafka.camelKafkaOptions.sslEndpointAlgorithm=HTTPS
kafka.camelKafkaOptions..sslKeyPassword=<ssl key password>
kafka.camelKafkaOptions..sslKeystoreLocation=<keystorepath>
kafka.camelKafkaOptions.sslKeystorePassword=<sslkeystore password>
kafka.camelKafkaOptions.sslTruststoreLocation=<truststore path>
kafka.camelKafkaOptions.sslTruststorePassword=<password>

and create a utility method, to construct a kafka url

@Component
public class KafkaUtility {
      public String getKafkaEndpoint(String topicName ){
       StringBuilder urlBuilder = new StringBuilder("kafka:" + topicName);

        if (!getCamelKafkaOptions().isEmpty()) {
            urlBuilder.append("&");
            getCamelKafkaOptions().forEach(
                (key, value) -> {
                    if (StringUtils.isNotBlank(value)) {
                        appendConfig(urlBuilder, key, value);
                    }
                }
            );
        }
        // strip the last "&" symbol
         String kafkaURL = stripLastAnd(urlBuilder.toString());
        return kafkaURL;
   }
}

In your route builder, implement the below

    @Autowired
    private KafkaUtility kafkaUtility;

  from(kafkaUtility.getKafkaEndpoint())
  .process("yourprocessor")
  .to("tourl");