Code Location
I think there might be to many modules to make this question clean looking, so here is the repo. I will hopefully include all the necessary components. https://github.com/ewingian/RestCalculator
Problem
I am learning to write Kafka services, this process includes learning unit testing for the producer and consumer. Followed a tutorial on setting up unit testing with the consumer. When I run the test I receive a class configuration error.
ERROR
9:34:59.547 [main] WARN kafka.server.BrokerMetadataCheckpoint - No meta.properties file under dir /tmp/kafka-8346130278143417083/meta.properties
09:34:59.567 [main] ERROR kafka.server.KafkaServer - [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown
java.lang.NoClassDefFoundError: org/apache/kafka/common/network/LoginType
at kafka.network.Processor.<init>(SocketServer.scala:406)
at kafka.network.SocketServer.newProcessor(SocketServer.scala:141)
at kafka.network.SocketServer$$anonfun$startup$1$$anonfun$apply$1.apply$mcVI$sp(SocketServer.scala:94)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:93)
at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:89)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at kafka.network.SocketServer.startup(SocketServer.scala:89)
at kafka.server.KafkaServer.startup(KafkaServer.scala:219)
at kafka.utils.TestUtils$.createServer(TestUtils.scala:120)
at kafka.utils.TestUtils.createServer(TestUtils.scala)
at org.springframework.kafka.test.rule.KafkaEmbedded.before(KafkaEmbedded.java:154)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.network.LoginType
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 23 common frames omitted
The picture shows my directory structure
When I look at the list of libraries in the project structure settings of IDEA I see org.apache.kafka:kafka-clients:0.11.0.0
; however I cannot import the missing module, which I understand is part of kafka-clients. (org/apache/kafka/common/network/LoginType
)
Question
Has anyone come across this error before? Have I misconfigured my gradle file? Is my project directory set up correctly to effectively What might I be missing?perform Kafka Unit testing? I have not found much information on the LoginType yet, but will keep searching.
Here is a copy of gradle build file:
buildscript {
repositories {
mavenCentral()
}misconfigured
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.10.RELEASE")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
jar {
baseName = 'calculator'
version = '0.1.0'
}
repositories {
mavenCentral()
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
dependencies {
compile("org.springframework.boot:spring-boot-starter-web")
testCompile("org.springframework.boot:spring-boot-starter-test")
compile("org.springframework.kafka:spring-kafka:1.3.2.RELEASE")
testCompile("org.springframework.kafka:spring-kafka-test")
}
task wrapper(type: Wrapper) {
gradleVersion = '2.3'
}
If there is anything else I might need to include in this question please let me know. Thanks
Unit Test Code
package com.calculator;
/**
* Created by ian on 2/9/18.
*/
import com.calculator.kafka.services.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertThat;
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaTest {
// private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);
private static final String TEMPLATE_TOPIC = "input";
private static String SENDER_TOPIC = "input";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEMPLATE_TOPIC);
private KafkaMessageListenerContainer<String, Integer> container;
private BlockingQueue<ConsumerRecord<String, Integer>> records;
@Autowired
private KafkaConsumer consumer;
@Before
public void setUp() {
// Set up the consumer properties
Map<String, Object> integerProperties = KafkaTestUtils.consumerProps("jsa-group", "false", embeddedKafka);
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, Integer> consumerFactory = new DefaultKafkaConsumerFactory<String, Integer>(integerProperties);
// set the topic that needs to be consumed
ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);
// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, Integer>() {
@Override
public void onMessage(ConsumerRecord<String, Integer> record) {
// LOGGER.debug("test-listener received message='{}'", record.toString());
records.add(record);
}
});
// start the container and underlying message listener
container.start();
}
@After
public void tearDown() {
// stop the container
container.stop();
}
@Test
public void testTemplate() throws Exception {
// send the message
String greeting = "Hello Spring Kafka Sender!";
Integer i1 = 12;
consumer.processMessage(i1);
// check that the message was received
ConsumerRecord<String, Integer> received = records.poll(10, TimeUnit.SECONDS);
// Hamcrest Matchers to check the value
assertThat(received, hasValue(i1));
// AssertJ Condition to check the key
assertThat(received).has(key(null));
}
}
Here is a summary of what I discovered:
SOLUTION
Finally went to spring's website and used their project generator. Pulled in the latest spring Kafka and Boost. This gave me a fresh build.gradle with the proper repos. Added spring-kafka-test in manually and unit tests performed successfully.