Pattern to change Kafka producer type per configuration

52 views Asked by At

I am trying to have some configuration that can produce Kafka messages output in a specific type, byte array vs JSON etc.

Currently, I have a Kafka producer such as:

 @Bean
    public KafkaSender<String, SomePojo> kafkaSender() {
        final Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerVariable);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class);
        final SenderOptions<String, SomePojo> senderOptions = SenderOptions.create(properties);
        return KafkaSender.create(senderOptions);
    }

And this would work fine, i.e, we can see in the destination Kafka topic a string representing a JSON of SomePojo.

Now, in another topic, the same POJO needs to be sent in bytes.

Not knowing what to do, we duplicated an entire job, just changing this:

    @Bean
    public KafkaSender<String, byte[]> kafkaSender() {
        final Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerVariable);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        final SenderOptions<String, byte[]> senderOptions = SenderOptions.create(properties);
        return KafkaSender.create(senderOptions);
    }

Note the SomePojo vs byte[] and KafkaJsonSerializer vs ByteArraySerializer This would also work fine, now seeing the messages in Kafka as byte array.

This is tedious, and I believe not the correct way to do this (imagine it is not only byte array vs. JSON, but other output types in the picture).

Is there a way to keep one single bean, but make it configurable?

Such as (making this up):

    @Bean
    public KafkaSender<String, {retrieve from some configuration}> kafkaSender() {
        final Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerVariable);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, {retrieve from some configuration});
        final SenderOptions<String, {retrieve from some configuration}> senderOptions = SenderOptions.create(properties);
        return KafkaSender.create(senderOptions);
    }
1

There are 1 answers

0
OneCricketeer On

You're going to need one serializer per unique object type, or unique producer configuration.

Yes, you could use ByteArraySerialixer, then externally call your own serialize functions, such as Jackson ObjectMapper writeBytes

Similarly, you could use JsonNode instead of a POJO and that would accept any valid JSON

the same POJO needs to be sent in bytes

Unclear what this means. Kafka only accepts bytes, not objects. The Java generics are more related to type erasure than config management. If you provide JsonSerializer, it's just using ObjectMapper for you to return the bytes array before sending