How do I resolve an error in my configuration when trying to unit test a Spring Kafka Consumer?

1.7k views Asked by At

Code Location

I think there might be to many modules to make this question clean looking, so here is the repo. I will hopefully include all the necessary components. https://github.com/ewingian/RestCalculator

Problem

I am learning to write Kafka services, this process includes learning unit testing for the producer and consumer. Followed a tutorial on setting up unit testing with the consumer. When I run the test I receive a class configuration error.

ERROR

9:34:59.547 [main] WARN kafka.server.BrokerMetadataCheckpoint - No meta.properties file under dir /tmp/kafka-8346130278143417083/meta.properties
09:34:59.567 [main] ERROR kafka.server.KafkaServer - [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown
java.lang.NoClassDefFoundError: org/apache/kafka/common/network/LoginType
    at kafka.network.Processor.<init>(SocketServer.scala:406)
    at kafka.network.SocketServer.newProcessor(SocketServer.scala:141)
    at kafka.network.SocketServer$$anonfun$startup$1$$anonfun$apply$1.apply$mcVI$sp(SocketServer.scala:94)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:93)
    at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:89)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
    at kafka.network.SocketServer.startup(SocketServer.scala:89)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:219)
    at kafka.utils.TestUtils$.createServer(TestUtils.scala:120)
    at kafka.utils.TestUtils.createServer(TestUtils.scala)
    at org.springframework.kafka.test.rule.KafkaEmbedded.before(KafkaEmbedded.java:154)
    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.network.LoginType
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 23 common frames omitted

The picture shows my directory structure enter image description here

When I look at the list of libraries in the project structure settings of IDEA I see org.apache.kafka:kafka-clients:0.11.0.0; however I cannot import the missing module, which I understand is part of kafka-clients. (org/apache/kafka/common/network/LoginType)

Question

Has anyone come across this error before? Have I misconfigured my gradle file? Is my project directory set up correctly to effectively What might I be missing?perform Kafka Unit testing? I have not found much information on the LoginType yet, but will keep searching.

Here is a copy of gradle build file:

buildscript {
    repositories {
        mavenCentral()
    }misconfigured
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.10.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'

jar {
    baseName = 'calculator'
    version =  '0.1.0'
}

repositories {
    mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
    compile("org.springframework.boot:spring-boot-starter-web")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    compile("org.springframework.kafka:spring-kafka:1.3.2.RELEASE")
    testCompile("org.springframework.kafka:spring-kafka-test")
}

task wrapper(type: Wrapper) {
    gradleVersion = '2.3'
}

If there is anything else I might need to include in this question please let me know. Thanks

Unit Test Code

package com.calculator;

/**
 * Created by ian on 2/9/18.
 */
import com.calculator.kafka.services.*;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertThat;
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;

import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaTest {

//    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);
    private static final String TEMPLATE_TOPIC = "input";
    private static String SENDER_TOPIC = "input";
    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEMPLATE_TOPIC);

    private KafkaMessageListenerContainer<String, Integer> container;

    private BlockingQueue<ConsumerRecord<String, Integer>> records;

    @Autowired
    private KafkaConsumer consumer;

    @Before
    public void setUp() {
        // Set up the consumer properties
        Map<String, Object> integerProperties = KafkaTestUtils.consumerProps("jsa-group", "false", embeddedKafka);

        // create a Kafka consumer factory
        DefaultKafkaConsumerFactory<String, Integer> consumerFactory = new DefaultKafkaConsumerFactory<String, Integer>(integerProperties);

        // set the topic that needs to be consumed
        ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);

        // create a Kafka MessageListenerContainer
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

        // setup a Kafka message listener
        container.setupMessageListener(new MessageListener<String, Integer>() {
            @Override
            public void onMessage(ConsumerRecord<String, Integer> record) {
//                LOGGER.debug("test-listener received message='{}'", record.toString());
                records.add(record);
            }
        });

        // start the container and underlying message listener
        container.start();
    }

    @After
    public void tearDown() {
        // stop the container
        container.stop();
    }

    @Test
    public void testTemplate() throws Exception {
        // send the message
        String greeting = "Hello Spring Kafka Sender!";
        Integer i1 = 12;
        consumer.processMessage(i1);

        // check that the message was received
        ConsumerRecord<String, Integer> received = records.poll(10, TimeUnit.SECONDS);
        // Hamcrest Matchers to check the value
        assertThat(received, hasValue(i1));
        // AssertJ Condition to check the key
        assertThat(received).has(key(null));
    }
}
1

There are 1 answers

0
nope On BEST ANSWER

Here is a summary of what I discovered:

  1. I was not using the most up to date Spring Boot version. Following the tutorials I used 1.5.10.RELEASE. Nothing wrong with that, but it was having compatibility issues with spring kafka.
  2. I tried using an up to date version of spring kafka with spring boot 1.5.10.RELEASE. Kept getting errors dealing with certain classes not being found. Had to lower the version of Kafka to 1.3.2.RELEASE
  3. This configuration allow me to run my spring boot application, but the unit testing failed hence this stack overflow question.
  4. I attempted to rewrite my gradle file to use newer spring boot and kafka, was met with failure, I think the repositories were no good.

SOLUTION

Finally went to spring's website and used their project generator. Pulled in the latest spring Kafka and Boost. This gave me a fresh build.gradle with the proper repos. Added spring-kafka-test in manually and unit tests performed successfully.