KafkaJS constantly throwing connection errors

299 views Asked by At

I made a bunch of services with Node.js, Typescript and KafkaJS. Every one of them is throwing a connection error after a while. All services aren't used very often. So sometimes they idle for hours or maybe even days. The topics and everything else is set up in Confluent.

The logs look like this:

[2023-11-02T16:18:51.088Z] info: Heartbeat server listening on port 5555
[2023-11-02T16:18:51.664Z] info: Message: Starting
[2023-11-02T16:18:52.146Z] info: Message: Consumer has joined the group
[2023-11-03T02:09:48.973Z] error: Message: Connection error: read ETIMEDOUT
[2023-11-03T02:09:57.163Z] error: Message: Connection error: read ETIMEDOUT
[2023-11-03T02:12:19.670Z] error: Message: Crash: KafkaJSNumberOfRetriesExceeded: Request Fetch(key: 1, version: 11) timed out
[2023-11-03T02:12:19.690Z] info: Message: Stopped
[2023-11-03T02:12:19.693Z] error: Message: Restarting the consumer in 12778ms
[2023-11-03T02:12:32.472Z] info: Message: Starting
[2023-11-03T02:12:33.101Z] info: Message: Consumer has joined the group
[2023-11-03T02:25:24.907Z] error: Message: Connection error: read ETIMEDOUT
[2023-11-03T02:25:24.908Z] error: Message: Connection error: read ETIMEDOUT
[2023-11-03T02:25:24.909Z] error: Message: Connection error: read ETIMEDOUT
[2023-11-03T02:25:24.910Z] error: Message: Connection error: read ETIMEDOUT
[2023-11-03T02:25:28.999Z] error: Message: Connection error: read ETIMEDOUT
[2023-11-03T18:55:47.672Z] error: Message: Connection timeout
[2023-11-03T18:55:47.930Z] error: Message: Connection error: Client network socket disconnected before secure TLS connection was established
[2023-11-05T19:15:12.382Z] error: Error: This server is not the leader for that topic-partition, Message: Response Fetch(key: 1, version: 11)
[2023-11-05T19:15:13.711Z] error: Message: Connection timeout
[2023-11-05T19:15:13.962Z] error: Message: Connection error: Client network socket disconnected before secure TLS connection was established
[2023-11-06T05:31:58.978Z] error: Message: Connection timeout
[2023-11-06T05:31:59.253Z] error: Message: Connection error: Client network socket disconnected before secure TLS connection was established

or like this:

[2023-11-03T13:06:36.781Z] info: Heartbeat server listening on port 5555
[2023-11-03T13:06:37.518Z] info: Message: Starting
[2023-11-03T13:06:38.220Z] info: Message: Consumer has joined the group
[2023-11-04T15:00:18.318Z] error: Message: Connection timeout
[2023-11-04T15:00:19.369Z] error: Message: Connection error: Client network socket disconnected before secure TLS connection was established
[2023-11-04T23:20:58.184Z] error: Error: This is not the correct coordinator for this group, Message: Response Heartbeat(key: 12, version: 3)
[2023-11-04T23:20:59.997Z] warn: Error: This is not the correct coordinator for this group, Message: The group is rebalancing, re-joining
[2023-11-04T23:21:01.846Z] info: Message: Consumer has joined the group
(node:1) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 connect listeners added to [TLSSocket]. Use emitter.setMaxListeners() to increase limit(Use `node --trace-warnings ...` to show where the warning was created)

I'm using KafkaJs in this way on every service:

import { Kafka, SASLOptions, logLevel } from "kafkajs";
import { SchemaRegistry } from "@kafkajs/confluent-schema-registry";
import * as dotenv from "dotenv";
import { WinstonLogCreator } from "./logger";
import * as net from "net";
import * as tls from "tls";
dotenv.config();

const server = process.env.KAFKA_SERVER || "";

const sasl: SASLOptions = {
  username: process.env.KAFKA_KEY || "",
  password: process.env.KAFKA_SECRET || "",
  mechanism: "plain"
};
const ssl = !!sasl;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const myCustomSocketFactory = ({ host, port, ssl, onConnect }: any) => {
  const socket = ssl
    ? tls.connect(Object.assign({ host, port }, ssl), onConnect)
    : net.connect({ host, port }, onConnect);

  socket.setKeepAlive(true, 5000);

  return socket;
};

export const kafka = new Kafka({
  logLevel: logLevel.DEBUG,
  logCreator: WinstonLogCreator,
  clientId: process.env.PROCESS_NAME,
  brokers: [server],
  socketFactory: myCustomSocketFactory,
  ssl,
  sasl
});

export const registry = new SchemaRegistry({
  host: process.env.SCHEMA_REGISTRY_SERVER || "",
  auth: {
    username: process.env.SCHEMA_KEY,
    password: process.env.SCHEMA_SECRET
  }
});

I played around with different timeouts and solutions I've found, but nothing helped me ever since. Does someone have an idea why this happens?

Edit:
Just found this error. As I think it's related to the other errors I'll add it here.

[2023-11-04T09:16:58.651Z] error: Message: Connection error: 384B9C74447F0000:error:0A0000CD:SSL routines:ssl3_read_bytes:invalid alert:../deps/openssl/openssl/ssl/record/rec_layer_s3.c:1556:
0

There are 0 answers