We're using NestJS as our Typescript framework for a microservice based architecture. Some of our deployments are what we call "Kafka workers", pods that run code that doesn't actually expose any REST endpoints, rather only listens to kafka topics and handles incoming events.
The problem is that the global exception filter configured to hopefully catch any throw exception, does not catch anything (and we end up with nods UnhandledPromiseRejection
)
The exception filter is very basically configured like this (following the NestJS docs guidelines):
@Catch()
export class KafkaWorkerExceptionFilter implements ExceptionFilter {
private logger: AppLogger = new AppLogger(KafkaWorkerExceptionFilter.name);
catch(error: Error, host: ArgumentsHost): void {
this.logger.error('Uncaught exception', error);
}
}
Our controller for such workers is configured like this:
@Controller()
export class KafkaWorkerController {
private readonly logger = new AppLogger(KafkaWorkerController.name);
constructor(
) {
this.logger.log('Init');
}
@EventPattern(KafkaTopic.PiiRemoval)
async removePiiForTalent(data: IncomingKafkaMessage): Promise<void> {
await asyncDoSomething();
throw new Error('Business logic failed');
}
}
Now, we expect the global exception filter to catch the error thrown from inside the controller handler function (as well as real errors, thrown from real functions nested inside it for sync/async operations). This does not happen.
Again, following the NestJS docs on implementing such a filter I tried many ways, and combinations of ways to 'register' that filter, with no success:
- listing as provider on the top-level module definition
{ provide: APP_FILTER, useClass: KafkaWorkerExceptionFilter }
- using the
@UseFilters(KafkaWorkerExceptionFilter)
decorator above the controller class - using nest's
app.useGlobalFilters(new KafkaWorkerExceptionFilter());
on themain.ts
file before/after usingapp.connectMicroservice(...)
with the kafka config
Just as a reference to how we init the app in the "kafka-worker" configuration, here is the main.ts
file:
async function bootstrap() {
const app = await NestFactory.create(KafkaWorkerAppModule, {
logger: ['error', 'warn', 'debug', 'log', 'verbose'],
});
app.use(Helmet());
app.useGlobalPipes(
new ValidationPipe({
disableErrorMessages: false,
whitelist: true,
transform: true,
}),
);
const logger: AppLogger = new AppLogger('Bootstrap');
const config: ConfigService = app.get(ConfigService);
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: SECRET_VALUE,
brokers: [SECRET_HOST_ADDRESS],
ssl: true,
sasl: SOME_BOOLEAN_VALUE
? {
mechanism: 'plain',
username: SECRET_VALUE,
password: SECRET_VALUE,
}
: undefined,
},
consumer: {
allowAutoTopicCreation: false,
groupId: SECRET_VALUE,
},
},
});
await app.startAllMicroservices();
const port = config.servicePort || 3000;
await app.listen(port, () => {
logger.log(`Kafka Worker listening on port: ${port}`);
logger.log(`Environment: ${config.nodeEnv}`);
});
}
bootstrap();
When using the
connectMicroservice()
method, you're creating a Hybrid Application.So all you need to do in your case is:
APP_FILTER
to theKafkaWorkerAppModule
app.useGlobalFilters(new KafkaWorkerExceptionFilter())
inmain.ts
@UseFilters(KafkaWorkerExceptionFilter)
on each relevant provider - I would avoid this for global filtersinheritAppConfig
option toapp.connectMicroservice()
in yourmain.ts
: