NestJS: Global exception filter not catching anything thrown from a Kafka-based microservice app

2.8k views Asked by At

We're using NestJS as our Typescript framework for a microservice based architecture. Some of our deployments are what we call "Kafka workers", pods that run code that doesn't actually expose any REST endpoints, rather only listens to kafka topics and handles incoming events.

The problem is that the global exception filter configured to hopefully catch any throw exception, does not catch anything (and we end up with nods UnhandledPromiseRejection)

The exception filter is very basically configured like this (following the NestJS docs guidelines):

@Catch()
export class KafkaWorkerExceptionFilter implements ExceptionFilter {
  private logger: AppLogger = new AppLogger(KafkaWorkerExceptionFilter.name);

  catch(error: Error, host: ArgumentsHost): void {
    this.logger.error('Uncaught exception', error);
  }
}

Our controller for such workers is configured like this:

@Controller()
export class KafkaWorkerController {
  private readonly logger = new AppLogger(KafkaWorkerController.name);

  constructor(
  ) {
    this.logger.log('Init');
  }

  @EventPattern(KafkaTopic.PiiRemoval)
  async removePiiForTalent(data: IncomingKafkaMessage): Promise<void> {
    await asyncDoSomething();
    throw new Error('Business logic failed');
  }
}

Now, we expect the global exception filter to catch the error thrown from inside the controller handler function (as well as real errors, thrown from real functions nested inside it for sync/async operations). This does not happen.


Again, following the NestJS docs on implementing such a filter I tried many ways, and combinations of ways to 'register' that filter, with no success:

  • listing as provider on the top-level module definition { provide: APP_FILTER, useClass: KafkaWorkerExceptionFilter }
  • using the @UseFilters(KafkaWorkerExceptionFilter) decorator above the controller class
  • using nest's app.useGlobalFilters(new KafkaWorkerExceptionFilter()); on the main.ts file before/after using app.connectMicroservice(...) with the kafka config

Just as a reference to how we init the app in the "kafka-worker" configuration, here is the main.ts file:

async function bootstrap() {
  const app = await NestFactory.create(KafkaWorkerAppModule, {
    logger: ['error', 'warn', 'debug', 'log', 'verbose'],
  });

  app.use(Helmet());
  app.useGlobalPipes(
    new ValidationPipe({
      disableErrorMessages: false,
      whitelist: true,
      transform: true,
    }),
  );
  const logger: AppLogger = new AppLogger('Bootstrap');

  const config: ConfigService = app.get(ConfigService);

  app.connectMicroservice({
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: SECRET_VALUE,
        brokers: [SECRET_HOST_ADDRESS],
        ssl: true,
        sasl: SOME_BOOLEAN_VALUE
          ? {
              mechanism: 'plain',
              username: SECRET_VALUE,
              password: SECRET_VALUE,
            }
          : undefined,
      },
      consumer: {
        allowAutoTopicCreation: false,
        groupId: SECRET_VALUE,
      },
    },
  });



  await app.startAllMicroservices();
  const port = config.servicePort || 3000;
  await app.listen(port, () => {
    logger.log(`Kafka Worker listening on port: ${port}`);
    logger.log(`Environment: ${config.nodeEnv}`);
  });
}

bootstrap();
1

There are 1 answers

3
ZimGil On BEST ANSWER

When using the connectMicroservice() method, you're creating a Hybrid Application.

By default a hybrid application will not inherit global pipes, interceptors, guards and filters configured for the main (HTTP-based) application. To inherit these configuration properties from the main application, set the inheritAppConfig property in the second argument (an optional options object) of the connectMicroservice() call, as follow:

const microservice = app.connectMicroservice({
 transport: Transport.TCP
}, { inheritAppConfig: true });

So all you need to do in your case is:

  1. Add the Filter - in ONE of these methods:
    1. As an APP_FILTER to the KafkaWorkerAppModule
    2. As a global filter using app.useGlobalFilters(new KafkaWorkerExceptionFilter()) in main.ts
    3. Use @UseFilters(KafkaWorkerExceptionFilter) on each relevant provider - I would avoid this for global filters
  2. Add the inheritAppConfig option to app.connectMicroservice() in your main.ts:
app.connectMicroservice({
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: SECRET_VALUE,
        brokers: [SECRET_HOST_ADDRESS],
        ssl: true,
        sasl: SOME_BOOLEAN_VALUE
          ? {
              mechanism: 'plain',
              username: SECRET_VALUE,
              password: SECRET_VALUE,
            }
          : undefined,
      },
      consumer: {
        allowAutoTopicCreation: false,
        groupId: SECRET_VALUE,
      },
    },
  },
  { inheritAppConfig: true }
);