I have a microservice connecting to a Kafka topic that contains messages from a Python producer. When compressing messages on the sender side, the NestJS consumer works fine until the message has more than four key-value pairs. For everything larger than that, an empty string is passed to the handler function within the consumer.
Any idea what might cause this behaviour?
Consumer configuration
import { EventsService } from './events.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { EventsController } from './events.controller';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { DbModule } from '../db/db.module';
import { readFileSync } from 'fs';
import { SnappyCodec } from 'kafkajs-snappy';
import { CompressionTypes, CompressionCodecs } from 'kafkajs';
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec;
@Module({
imports: [
DbModule,
ConfigModule.forRoot({
isGlobal: true, // Makes the configuration global
}),
ClientsModule.registerAsync([
{
name: 'EVENT_MICROSERVICE',
useFactory: (configService: ConfigService) => ({
transport: Transport.KAFKA,
options: {
client: {
compression: CompressionTypes.Snappy,
clientId: configService.get('kafka.client_id'),
brokers: [
`${configService.get('kafka.host')}:${configService.get(
'kafka.port',
)}`,
],
ssl: configService.get('kafka.ssl')
? {
rejectUnauthorized: false,
ca: [configService.get<string>('kafka.ssl_ca')],
cert: configService.get<string>('kafka.ssl_certificate'),
key: configService.get<string>('kafka.ssl_key'),
}
: false,
},
producerOnlyMode: false,
consumer: {
groupId: `event-consumer-${configService.get(
'kafka.consumer_group_id',
)}`,
sessionTimeout: 90000,
heartbeatInterval: 30000,
},
send: {
compression: CompressionTypes.Snappy,
},
},
}),
inject: [ConfigService],
},
]),
],
providers: [EventsService],
controllers: [EventsController],
})
export class EventsModule {}
Python producer
from confluent_kafka import Producer
conf = {
"bootstrap.servers": "some-random-server:1234",
"client.id": socket.gethostname(),
"security.protocol": "SSL",
"ssl.ca.location": "ca.pem",
"ssl.certificate.location": "service.cert",
"ssl.key.location": "service.key",
'compression.type': 'snappy' }
producer = Producer(conf)
event = {
"id": "DC127DCC-2689-4CE7-BC42-890BB56BCC4E.1",
"version": "1.0",
"occurred": "2023-10-28T09:31:18.147Z",
"original_encoding": "ascii",
"original_encodingg": "ascii",
}
producer.produce(TOPIC_NAME, key="key", value=json.dumps(event))
Adding the following did the trick for me
It looks that Nest will use CompressionCodecs object under the hood and this will automatically give support for Snappy.