Kafka client with Kerberos Node -1 disconnected

34 views Asked by At

Please help me figure out the settings of Kafka producer. You need to do a test that creates a kafka producer object and sends a message. Authorization in kafka is configured via kerberos. When trying to send a message, Node -1 disconnected writes.

public static Properties getDefaultProperties(){
        Properties pp = new Properties();
        pp.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "XXXXXXXXXX:6667");
        pp.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pp.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pp.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI");
        pp.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
        pp.setProperty(KafkaPrincipal.USER_TYPE, "XXXX@XXXXXXX");
        System.setProperty("sun.security.krb5.debug","true");
        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
        pp.setProperty("java.security.krb5.conf", "C:/krb5.conf");
        pp.setProperty("keyTab", "C:/XXXXXXX.keytab");
        pp.setProperty(SaslConfigs.SASL_JAAS_CONFIG, "KafkaClient {\n" +
                        "            com.sun.security.auth.module.Krb5LoginModule required\n" +
                        "            useKeyTab=true\n" +
                        "            keyTab=\"C:/XXXXXXXXX.keytab\"\n" +
                        "            storeKey=true\n" +
                        "            useTicketCache=false\n" +
                        "            serviceName=\"kafka\"\n" +
                        "            isInitiator=true\n" +
                        "            debug=${smdp.kerberos.debug-enabled}\n" +
                        "            principal=\"XXXX@XXXXXXX";\n" +
                        "        }");


        pp.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
        pp.setProperty(ProducerConfig.ACKS_CONFIG,"all");
        pp.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
        pp.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
        pp.setProperty(ProducerConfig.LINGER_MS_CONFIG,"20"); // linger for 20ms
        pp.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32KBytes

        return pp;
    }

I've tried different configuration options, but nothing helps. I would be very grateful if anyone could tell me which way to dig.

0

There are 0 answers