Kafka consumer has no memeberAssignment and unable to consume messages

134 views Asked by At

I have a nestjs app that is meant to create a consumer instance, where i listen to events, and i also want to provide the consumer instance to a health-check service making sure that the consumer is healthy, without making additional connections.

I have a class to extend serverKafka

export class ExtendedClientKafka extends ServerKafka implements CustomTransportStrategy {
  constructor(options: KafkaOptions['options']) {
    super(options);
  }

  get publicConsumer() {
    return this.consumer;
  }

  async connect() {
    // this.consumer.subscribe({ topic: process.env.KAFKA_TOPIC });

    await this.listen(() => {
      console.log('Kafka consumer connected');
    });
  }
}

and it is used in kafkaService to connect

@Injectable()
export class KafkaService implements OnModuleInit, OnModuleDestroy {
  private client: ExtendedClientKafka;
  private readonly logger = new Logger(KafkaService.name);

  async onModuleInit() {
    if (process.env.NODE_ENV !== 'localhost') {
      try {
        this.client = new ExtendedClientKafka({
          client: {
            brokers: [process.env.KAFKA_BROKER],
          },
          consumer: {
            groupId: process.env.KAFKA_GROUP_ID,
            retry: { retries: 3 },
          },
        });
        await this.client.connect();
        this.logger.log('Connected to Kafka');
      } catch (error) {
        this.logger.error(`Failed to connect to Kafka:, ${(error as Error).message}`);
        throw new Error('Failed to establish connection to Kafka. Please check your configuration and network.');
      }
    }
  }

  async onModuleDestroy() {
    await this.disconnect();
  }

  getClient(): ExtendedClientKafka {
    return this.client;
  }

  async disconnect(): Promise<void> {
    if (this.client) {
      await this.client.close();
    }
  }
}

My kafkaservice is then used in all modules where i want the connection to kafka. My issue is that in a controller i want to use nestjs eventpattern decorator to listen for events like this

@Controller()
export class ConsumerController {
  private readonly logger = new Logger(ConsumerController.name);
  constructor(
    private readonly kafkaService: KafkaService,
    private readonly consumerService: ConsumerService) {}

  @EventPattern(process.env.KAFKA_TOPIC)
  async handleScheduledJobs(message: Buffer) {
    this.logger.log("Received message from Kafka", message.toString(), message);
    // await this.consumerService.elaborateMessage(message);
  }
}

but after starting my app i join the consumer group but get no memberAssignment and unable to log any messages written to that topic. Here is a log of my consumer joining:

LOG [ServerKafka] INFO [ConsumerGroup] Consumer has joined the group {"timestamp":"2023-10-30T13:09:26.718Z","logger":"kafkajs","groupId":"device-parameters-manager-server","memberId":"nestjs-consumer-server-05a610ed-ea78-40fe-b2ce-0eae0d36d8bf","leaderId":"nestjs-consumer-server-05a610ed-ea78-40fe-b2ce-0eae0d36d8bf","isLeader":true,"memberAssignment":{},"groupProtocol":"RoundRobinAssigner","duration":70}

If use app.connectMicroservices in main.ts i am able to join and process messages but unable to share the consumer instance in my health-check service. the health-check service will require me to make a different connection each time.

0

There are 0 answers