I am new to Kafka and I am experiencing a mixed behaviour when trying to setup proper error handling on my consumer when there is an error. In few instances I am observing retry policy in action - kafka retries my message 5 times(as what I configured) then consumer crashes, then recovers and my group rebalanaces. However, in other instances that's not happens - consumer crashes, then recovers and my group rebalances and consumer attempts to consume the message again and again, inifinitely.
Let's say I have a controller method that's subscribed to a Kafka topic
@EventPattern("cat-topic")
public async createCat(
@Payload()
message: CatRequestDto,
@Ctx() context: IKafkaContext
): Promise<void> {
try {
await this.catService.createCat(message);
} catch (ex) {
this.logger.error(ex);
throw new RpcException(
`Couldn't create a cat`
);
}
}
Using RpcFilter on this method, like this one - https://docs.nestjs.com/microservices/exception-filters :
import { Catch, RpcExceptionFilter, ArgumentsHost } from '@nestjs/common';
import { Observable, throwError } from 'rxjs';
import { RpcException } from '@nestjs/microservices';
@Catch(RpcException)
export class ExceptionFilter implements RpcExceptionFilter<RpcException> {
catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
return throwError(() => exception.getError());
}
}
I feel like it might be something funky happening with properly committing offsets or something else. Can't pinpoint it.
Any comments are suggestions are greatly appreciated.
Regarding this NestJs Docs it's normal behavior if you use @EventPattern along with RPC exception.
but the main issue is that the exception will keep firing even after the number of retries exceeded but by replacing the original Kafka server with the fixed one it solves the main issue
here is what you should do
ServerKafka