I am new with Kafka and I am trying to read a text file and create a list of strings that I want to send for the consumers. I am using Java 21 and Spring Boot 3.2.0 (SNAPSHOT).
This is the Kafka producer configuration:
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> producerConfig(){
Map<String,Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
return properties;
}
@Bean
public ProducerFactory<String, List<String>> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, List<String>> kafkaTemplate(ProducerFactory<String,List<String>> producerFactory){
return new KafkaTemplate<>(producerFactory);
}
}
This is the Topic configuration:
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic alexTopic(){
return TopicBuilder.name("securityTopic")
.build();
}
}
This is the "application.properties":
spring.servlet.multipart.enabled=true
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
spring.kafka.bootstrap-servers=192.168.241.133:9092
This is how I am receiving the file:
public Optional uploadFile(MultipartFile file){
if(file.isEmpty()){
return Optional.of(new EmptyFileException("File is empty please double check"));
}
return Optional.of(this.upload.uploadFile(file));
}
This is how I am sending the "List<String>" to kafka:
public UploadFileService(KafkaTemplate<String, List<String>> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public String uploadFile(MultipartFile file) {
try(BufferedReader reader = new BufferedReader(
new InputStreamReader(file.getInputStream()))){
String line;
int i=0;
List<String> list = new ArrayList<>();
while ((line=reader.readLine())!=null) {
if(i==10){
this.kafkaTemplate.send("securityTopic",list);
If I am sending pure Strings and I am using:
StringSerializer.class
instead of:
ListSerializer.class.getName()
in the Kafka producer configuration, and also change the the other generics to String instead of List<String> is working. But how can I send List<String> then? Again I am using Kafka for 2 or 3 days and I don't know if I have to create my own serializer or use something that is built in?
The error that gives me is as in the title: "Failed to construct kafka producer".
I have tried also to use "ListSerializer.class" also but is not working.
I also tried to use "JsonSerialize.class" and is still the same error.
Thank you for your time to read this!
It was the wrong import. the correct import is:
"import org.springframework.kafka.support.serializer.JsonSerializer;"