I have a nestjs app that is meant to create a consumer instance, where i listen to events, and i also want to provide the consumer instance to a health-check service making sure that the consumer is healthy, without making additional connections.
I have a class to extend serverKafka
export class ExtendedClientKafka extends ServerKafka implements CustomTransportStrategy {
constructor(options: KafkaOptions['options']) {
super(options);
}
get publicConsumer() {
return this.consumer;
}
async connect() {
// this.consumer.subscribe({ topic: process.env.KAFKA_TOPIC });
await this.listen(() => {
console.log('Kafka consumer connected');
});
}
}
and it is used in kafkaService to connect
@Injectable()
export class KafkaService implements OnModuleInit, OnModuleDestroy {
private client: ExtendedClientKafka;
private readonly logger = new Logger(KafkaService.name);
async onModuleInit() {
if (process.env.NODE_ENV !== 'localhost') {
try {
this.client = new ExtendedClientKafka({
client: {
brokers: [process.env.KAFKA_BROKER],
},
consumer: {
groupId: process.env.KAFKA_GROUP_ID,
retry: { retries: 3 },
},
});
await this.client.connect();
this.logger.log('Connected to Kafka');
} catch (error) {
this.logger.error(`Failed to connect to Kafka:, ${(error as Error).message}`);
throw new Error('Failed to establish connection to Kafka. Please check your configuration and network.');
}
}
}
async onModuleDestroy() {
await this.disconnect();
}
getClient(): ExtendedClientKafka {
return this.client;
}
async disconnect(): Promise<void> {
if (this.client) {
await this.client.close();
}
}
}
My kafkaservice is then used in all modules where i want the connection to kafka. My issue is that in a controller i want to use nestjs eventpattern decorator to listen for events like this
@Controller()
export class ConsumerController {
private readonly logger = new Logger(ConsumerController.name);
constructor(
private readonly kafkaService: KafkaService,
private readonly consumerService: ConsumerService) {}
@EventPattern(process.env.KAFKA_TOPIC)
async handleScheduledJobs(message: Buffer) {
this.logger.log("Received message from Kafka", message.toString(), message);
// await this.consumerService.elaborateMessage(message);
}
}
but after starting my app i join the consumer group but get no memberAssignment and unable to log any messages written to that topic. Here is a log of my consumer joining:
LOG [ServerKafka] INFO [ConsumerGroup] Consumer has joined the group {"timestamp":"2023-10-30T13:09:26.718Z","logger":"kafkajs","groupId":"device-parameters-manager-server","memberId":"nestjs-consumer-server-05a610ed-ea78-40fe-b2ce-0eae0d36d8bf","leaderId":"nestjs-consumer-server-05a610ed-ea78-40fe-b2ce-0eae0d36d8bf","isLeader":true,"memberAssignment":{},"groupProtocol":"RoundRobinAssigner","duration":70}
If use app.connectMicroservices in main.ts i am able to join and process messages but unable to share the consumer instance in my health-check service. the health-check service will require me to make a different connection each time.