I am trying deserialize an AVRO data to an object. But I get en error when deserializing with KafkaDeserializer or with custom deserializer.
Custom deserializer:
@Log4j2
@RequiredArgsConstructor
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
protected final Class<T> targetType;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// No-op
}
@Override
public Cbir deserialize(String topic, byte[] data) {
try {
T result = null;
if (data != null) {
log.debug("data='{}'", DatatypeConverter.printHexBinary(data));
DatumReader<T> datumReader =
new SpecificDatumReader<>(targetType.getDeclaredConstructor().newInstance().getSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = datumReader.read(null, decoder);
log.debug("deserialized data='{}'", result);
}
return result;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
}
}
@Override
public void close() {
// No-op
}
}
Consumer configuration:
import com.kafka.dto.MyGeneratedAvroWithMavenPlugin;
import com.utils.AvroDeserializer;
import com.utils.CryptoUtils;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaConsumerConfiguration {
private final KafkaProperties kafkaProperties;
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getAutoOffsetReset());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.isEnableAutoCommit());
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, kafkaProperties.getIsolationLevelConfig());
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaProperties.getHeartbeatIntervalMsConfig());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaProperties.getSessionTimeoutMsConfig());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId());
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000");
props.put("schema.registry.url", kafkaProperties.getSchemaRegistryUrl());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
if (Objects.nonNull(kafkaProperties) && Objects.nonNull(kafkaProperties.getSsl()) && kafkaProperties.getSsl().isEnabled()) {
props.put("security.protocol", kafkaProperties.getSecurity().getProtocol());
props.put("ssl.truststore.location", kafkaProperties.getSsl().getTrustStoreLocation());
props.put("ssl.truststore.password", kafkaProperties.getSsl().getTrustStorePassword(), kafkaProperties.getKey());
props.put("ssl.keystore.location", kafkaProperties.getSsl().getKeyStoreLocation());
props.put("ssl.keystore.password", kafkaProperties.getSsl().getKeyStorePassword(), kafkaProperties.getKey());
props.put("ssl.key.password", kafkaProperties.getSsl().getKeyPassword(), kafkaProperties.getKey());
}
return props;
}
@Bean
public ConsumerFactory<String, MyGeneratedAvroWithMavenPlugin> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig(), new StringDeserializer(), new AvroDeserializer<>(MyGeneratedAvroWithMavenPlugin.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyGeneratedAvroWithMavenPlugin>> kafkaListenerContainerFactory(
ConsumerFactory<String, MyGeneratedAvroWithMavenPlugin> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, MyGeneratedAvroWithMavenPlugin> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(kafkaProperties.getConcurrency());
return factory;
}
}
Kafka listener
import com.kafka.dto.MyGeneratedAvroWithMavenPlugin;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@Log4j2
@Service
@RequiredArgsConstructor
public class KafkaConsumer {
private final List<MyGeneratedAvroWithMavenPlugin> messages = new ArrayList<>();
@KafkaListener(topicPattern = "${kafka.topic}", containerFactory = "kafkaListenerContainerFactory")
public void receive(ConsumerRecord<String, MyGeneratedAvroWithMavenPlugin> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String groupID) {
log.info("Received payload key: '{}'", consumerRecord.key());
log.info("Received payload value: '{}'", consumerRecord.value().toString());
messages.add(consumerRecord.value());
}
}
Impossible to deserialize avro content using custom deserializer AvroDeserializer or KafkaDeserializer. For the custom one, it is failing on
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = datumReader.read(null, decoder);
I am getting when deserialize : java.lang.ArrayIndexOutOfBoundsException: Index -19 out of bounds for length 2 I need to understand why it is failing. Thanks