NestJS Kafka microservice decompressing snappy messages to empty string

110 views Asked by At

I have a microservice connecting to a Kafka topic that contains messages from a Python producer. When compressing messages on the sender side, the NestJS consumer works fine until the message has more than four key-value pairs. For everything larger than that, an empty string is passed to the handler function within the consumer.

Any idea what might cause this behaviour?

Consumer configuration

import { EventsService } from './events.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { EventsController } from './events.controller';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { DbModule } from '../db/db.module';
import { readFileSync } from 'fs';
import { SnappyCodec } from 'kafkajs-snappy';
import { CompressionTypes, CompressionCodecs } from 'kafkajs';

CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec;
@Module({
  imports: [
    DbModule,
    ConfigModule.forRoot({
      isGlobal: true, // Makes the configuration global
    }),
    ClientsModule.registerAsync([
      {
        name: 'EVENT_MICROSERVICE',
        useFactory: (configService: ConfigService) => ({
          transport: Transport.KAFKA,
          options: {
            client: {
              compression: CompressionTypes.Snappy,

              clientId: configService.get('kafka.client_id'),
              brokers: [
                `${configService.get('kafka.host')}:${configService.get(
                  'kafka.port',
                )}`,
              ],
              ssl: configService.get('kafka.ssl')
                ? {
                    rejectUnauthorized: false,
                    ca: [configService.get<string>('kafka.ssl_ca')],
                    cert: configService.get<string>('kafka.ssl_certificate'),
                    key: configService.get<string>('kafka.ssl_key'),
                  }
                : false,
            },
            producerOnlyMode: false,
            consumer: {
              groupId: `event-consumer-${configService.get(
                'kafka.consumer_group_id',
              )}`,
              sessionTimeout: 90000,
              heartbeatInterval: 30000,
            },
            send: {
              compression: CompressionTypes.Snappy,
            },
          },
        }),
        inject: [ConfigService],
      },
    ]),
  ],
  providers: [EventsService],
  controllers: [EventsController],
})
export class EventsModule {}

Python producer

from confluent_kafka import Producer

conf = {
        "bootstrap.servers": "some-random-server:1234",
        "client.id": socket.gethostname(),
        "security.protocol": "SSL",
        "ssl.ca.location": "ca.pem",
        "ssl.certificate.location": "service.cert",
        "ssl.key.location": "service.key",
        'compression.type': 'snappy'    }

producer = Producer(conf)

event = {

    "id": "DC127DCC-2689-4CE7-BC42-890BB56BCC4E.1",
    "version": "1.0",
    "occurred": "2023-10-28T09:31:18.147Z",
    "original_encoding": "ascii",
    "original_encodingg": "ascii",

}
producer.produce(TOPIC_NAME, key="key", value=json.dumps(event))
1

There are 1 answers

0
Juan Chaves On

Adding the following did the trick for me

import { CompressionTypes, CompressionCodecs } from "kafkajs";
import * as SnappyCodec from "kafkajs-snappy";
import * as fs from "fs";

CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec;

It looks that Nest will use CompressionCodecs object under the hood and this will automatically give support for Snappy.