Incompatibility between Spring Kafka consumers with versions 2.3.13.RELEASE and 2.8.3

1k views Asked by At

I am having problems with kafka consumers of applications with different versions of Spring-kafka, specifically between 2.3.13.RELEASE and 2.8.3.

When deploying to PRO using blue/green strategies for deployment, I am having the problem that all partitions, when using concurrency on consumers, are assigned to the application using version 2.3.13.RELEASE, while the new application deploying a consumer with Spring-kafka version 2.8.3 (to the same topic and group name) when finishing the partition rebalancing is left with no partition at all.

This is the configuration currently used for the consumer:

    @KafkaListener(topics = "spring.kafka.test", groupId = "group-spring-kafka", concurrency = "10")
    public void processMessage(String content) {
        System.out.println("Message received: " + content);
    }

The topic has 10 partitions.

Below is the log of the applications in steps:

  1. Start app1 with spring-kafka version 2.3.13.RELEASE with a consumer:

APP1 LOGS

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::       (v2.2.13.RELEASE)

....9 thread consumer initialisations before this....

2022-04-10 10:56:44.516  INFO 2748 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = group-spring-kafka
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2022-04-10 10:56:44.519  INFO 2748 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2022-04-10 10:56:44.519  INFO 2748 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2022-04-10 10:56:44.519  INFO 2748 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1649581004519
2022-04-10 10:56:44.520  INFO 2748 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Subscribed to topic(s): spring.kafka.test

.....

2022-04-10 10:56:44.520  INFO 2748 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2022-04-10 10:56:44.528  INFO 2748 --- [           main] com.spring.kafka.sample.DemoApplication  : Started DemoApplication in 1.858 seconds (JVM running for 2.218)
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-0]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-4]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-1]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-3]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-5-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-6]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-2]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-8-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-9]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-6-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-7]
2022-04-10 10:56:47.670  INFO 2748 --- [ntainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-5]

  1. Start app2 with spring-kafka version 2.8.3 with a consumer (same topic and group name as consumer app1):

APP1 LOGS

2022-04-10 11:00:02.696  INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-7, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-7]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-6-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-7]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-9]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-7, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-0]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-0]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-8-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-9]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-1]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-1]
2022-04-10 11:00:02.697  INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.699  INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.699  INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-2]
2022-04-10 11:00:02.699  INFO 2748 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-2]
2022-04-10 11:00:02.699  INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.700  INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.700  INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-4]
2022-04-10 11:00:02.700  INFO 2748 --- [ntainer#0-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-4]
2022-04-10 11:00:02.700  INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.701  INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.701  INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-6]
2022-04-10 11:00:02.701  INFO 2748 --- [ntainer#0-5-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-6]
2022-04-10 11:00:02.701  INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-6, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-5]
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-5]
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-5, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-8]
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-7-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-8]
2022-04-10 11:00:02.703  INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-8, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.704  INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.704  INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-3]
2022-04-10 11:00:02.704  INFO 2748 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions revoked: [spring.kafka.test-3]
2022-04-10 11:00:02.704  INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-9
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-5
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-7, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-7, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-7
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-4
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-6
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-1
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-0
2022-04-10 11:00:02.708  INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-8
2022-04-10 11:00:02.707  INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-2
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-7, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-7 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-6 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-4 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-8 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-3
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-1 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-6-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-7]
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-5 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-7-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-8]
2022-04-10 11:00:02.710  INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-9 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-5]
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-5-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-6]
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-4]
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-8-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-9]
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-1]
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-2 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-2]
2022-04-10 11:00:02.711  INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-3 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.712  INFO 2748 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-3]
2022-04-10 11:00:02.750  INFO 2748 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: [spring.kafka.test-0]

APP2 LOGS

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.4)

....9 thread consumer initialisations before this....

2022-04-10 11:00:02.096  INFO 2995 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-group-spring-kafka-10
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = group-spring-kafka
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 45000
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

assigned partitions: 
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-5-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-8-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-7-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-6-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719  INFO 2995 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-spring-kafka: partitions assigned: []

As can be seen, the logs of APP2 (which is the version with spring-kafka 2.8.3) are left without assigned partitions, while APP1 remains with the 10 assigned partitions.

I have tried deploying in reverse order and the result is the same.

If I deploy the applications with the same versions the result is correct, 5 partitions are distributed for each application.

I have also tried to force the partitioning strategies for both versions with "RangeAssignor" but I still have the same problems.

Any ideas? thanks in advance.

[EDITED 5/17/2022]

I have done more tests with different versions of Spring-Kafka, and in the end the problem is reproduced in the update from Spring-Kafka version 2.3.13.RELEASE to Spring-Kafka version 2.4.0.RELEASE.

According to the release announcement of release 2.4.0:

The 2.4.0 kafka-clients are not binary compatible with Spring for Apache Kafka 2.3 so if you wish to use the 2.4.0 clients, you must upgrade to this version. See the appendix in the reference manual for how to override the jar versions, especially if you are using Spring Boot for dependency management and/or you are using the test embedded Kafka broker.

I'm not sure... could the incompatibility described in the advert be causing this behaviour?

1

There are 1 answers

7
Tomaz Fernandes On

If you want the load to be evenly balanced between the two applications, you should set each one's concurrency to 5 - otherwise I don't see why there would be any guarantees that one app should not be assigned all partitions if they are in the same consumer group.

Also, the kafka-clients library has been updated to a new major version between these Spring Kafka versions, so that might explain the difference in behavior - AFAIK Spring Kafka doesn’t interfere with partition assignment unless explicitly told so.

That being said, I'm not sure why you'd want both versions to consume records at the same time - if there are changes in business logic you won't be able to deterministically tell which version will process which partition / record, unless you manually assign partitions for each version.

What you might try is - let's say you're deploying green - deploy it with autoStartup set to false, make sure app deployment is ok, then stop the listener containers in the blue app and start the containers in the green app. Should anything go wrong, you can stop the containers in green and restart the blue ones as a rollback.

You can add this procedure as part of your blue / green deployment automation, probably along load balancer routing changes if there are exposed endpoints.

Refer to this section of Spring Kafka's documentation for more information on managing the containers lifecycle.

EDIT: As per @garyrussell's comment, the issue was indeed not related to Spring Kafka but rather to a change in how consumers are assigned a subscription id between kafka-clients versions, "with the range assignor sorting them and all the old subscriptions come first in the list".

Full discussion on the matter in this GitHub Issue.