I am having problems with kafka consumers of applications with different versions of Spring-kafka, specifically between 2.3.13.RELEASE and 2.8.3.
When deploying to PRO using blue/green strategies for deployment, I am having the problem that all partitions, when using concurrency on consumers, are assigned to the application using version 2.3.13.RELEASE, while the new application deploying a consumer with Spring-kafka version 2.8.3 (to the same topic and group name) when finishing the partition rebalancing is left with no partition at all.
This is the configuration currently used for the consumer:
@KafkaListener(topics = "spring.kafka.test", groupId = "group-spring-kafka", concurrency = "10")
public void processMessage(String content) {
System.out.println("Message received: " + content);
}
The topic has 10 partitions.
Below is the log of the applications in steps:
- Start app1 with spring-kafka version 2.3.13.RELEASE with a consumer:
APP1 LOGS
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.13.RELEASE)
....9 thread consumer initialisations before this....
2022-04-10 10:56:44.516 INFO 2748 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = group-spring-kafka
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2022-04-10 10:56:44.519 INFO 2748 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
2022-04-10 10:56:44.519 INFO 2748 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
2022-04-10 10:56:44.519 INFO 2748 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1649581004519
2022-04-10 10:56:44.520 INFO 2748 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Subscribed to topic(s): spring.kafka.test
.....
2022-04-10 10:56:44.520 INFO 2748 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2022-04-10 10:56:44.528 INFO 2748 --- [ main] com.spring.kafka.sample.DemoApplication : Started DemoApplication in 1.858 seconds (JVM running for 2.218)
2022-04-10 10:56:47.670 INFO 2748 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-0]
2022-04-10 10:56:47.670 INFO 2748 --- [ntainer#0-3-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-4]
2022-04-10 10:56:47.670 INFO 2748 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-1]
2022-04-10 10:56:47.670 INFO 2748 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-3]
2022-04-10 10:56:47.670 INFO 2748 --- [ntainer#0-5-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-6]
2022-04-10 10:56:47.670 INFO 2748 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-2]
2022-04-10 10:56:47.670 INFO 2748 --- [ntainer#0-8-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-9]
2022-04-10 10:56:47.670 INFO 2748 --- [ntainer#0-6-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-7]
2022-04-10 10:56:47.670 INFO 2748 --- [ntainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-5]
- Start app2 with spring-kafka version 2.8.3 with a consumer (same topic and group name as consumer app1):
APP1 LOGS
2022-04-10 11:00:02.696 INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-7, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-7]
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-6-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions revoked: [spring.kafka.test-7]
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-9]
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-7, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-0]
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions revoked: [spring.kafka.test-0]
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-8-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions revoked: [spring.kafka.test-9]
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-1]
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions revoked: [spring.kafka.test-1]
2022-04-10 11:00:02.697 INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.699 INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.699 INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-2]
2022-04-10 11:00:02.699 INFO 2748 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions revoked: [spring.kafka.test-2]
2022-04-10 11:00:02.699 INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.700 INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.700 INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-4]
2022-04-10 11:00:02.700 INFO 2748 --- [ntainer#0-3-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions revoked: [spring.kafka.test-4]
2022-04-10 11:00:02.700 INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.701 INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.701 INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-6]
2022-04-10 11:00:02.701 INFO 2748 --- [ntainer#0-5-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions revoked: [spring.kafka.test-6]
2022-04-10 11:00:02.701 INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.703 INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.703 INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-5]
2022-04-10 11:00:02.703 INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.703 INFO 2748 --- [ntainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions revoked: [spring.kafka.test-5]
2022-04-10 11:00:02.703 INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-5, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.703 INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-8]
2022-04-10 11:00:02.703 INFO 2748 --- [ntainer#0-7-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions revoked: [spring.kafka.test-8]
2022-04-10 11:00:02.703 INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-8, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.704 INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Attempt to heartbeat failed since group is rebalancing
2022-04-10 11:00:02.704 INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Revoking previously assigned partitions [spring.kafka.test-3]
2022-04-10 11:00:02.704 INFO 2748 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions revoked: [spring.kafka.test-3]
2022-04-10 11:00:02.704 INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=group-spring-kafka] (Re-)joining group
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-9
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-5
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-7, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-7, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-7
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-4
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-6
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-1
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-0
2022-04-10 11:00:02.708 INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-8
2022-04-10 11:00:02.707 INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-2
2022-04-10 11:00:02.710 INFO 2748 --- [ntainer#0-6-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-7, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-7 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-5-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-6, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-6 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.710 INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Successfully joined group with generation 15
2022-04-10 11:00:02.710 INFO 2748 --- [ntainer#0-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-4 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.710 INFO 2748 --- [ntainer#0-7-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-8, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-8 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Setting newly assigned partitions: spring.kafka.test-3
2022-04-10 11:00:02.710 INFO 2748 --- [ntainer#0-9-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-1 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-6-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-7]
2022-04-10 11:00:02.710 INFO 2748 --- [ntainer#0-4-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-5, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-5 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-7-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-8]
2022-04-10 11:00:02.710 INFO 2748 --- [ntainer#0-8-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-9, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-9 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-5]
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-5-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-6]
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-3-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-4]
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-8-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-9]
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-1]
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-2 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-2]
2022-04-10 11:00:02.711 INFO 2748 --- [ntainer#0-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=group-spring-kafka] Setting offset for partition spring.kafka.test-3 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=127.0.0.1:9092 (id: 1001 rack: null), epoch=0}}
2022-04-10 11:00:02.712 INFO 2748 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-3]
2022-04-10 11:00:02.750 INFO 2748 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: [spring.kafka.test-0]
APP2 LOGS
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.6.4)
....9 thread consumer initialisations before this....
2022-04-10 11:00:02.096 INFO 2995 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-group-spring-kafka-10
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = group-spring-kafka
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
assigned partitions:
2022-04-10 11:00:02.719 INFO 2995 --- [ntainer#0-5-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719 INFO 2995 --- [ntainer#0-8-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719 INFO 2995 --- [ntainer#0-9-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719 INFO 2995 --- [ntainer#0-7-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719 INFO 2995 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719 INFO 2995 --- [ntainer#0-3-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719 INFO 2995 --- [ntainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719 INFO 2995 --- [ntainer#0-6-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719 INFO 2995 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: []
2022-04-10 11:00:02.719 INFO 2995 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group-spring-kafka: partitions assigned: []
As can be seen, the logs of APP2 (which is the version with spring-kafka 2.8.3) are left without assigned partitions, while APP1 remains with the 10 assigned partitions.
I have tried deploying in reverse order and the result is the same.
If I deploy the applications with the same versions the result is correct, 5 partitions are distributed for each application.
I have also tried to force the partitioning strategies for both versions with "RangeAssignor" but I still have the same problems.
Any ideas? thanks in advance.
[EDITED 5/17/2022]
I have done more tests with different versions of Spring-Kafka, and in the end the problem is reproduced in the update from Spring-Kafka version 2.3.13.RELEASE to Spring-Kafka version 2.4.0.RELEASE.
According to the release announcement of release 2.4.0:
The 2.4.0 kafka-clients are not binary compatible with Spring for Apache Kafka 2.3 so if you wish to use the 2.4.0 clients, you must upgrade to this version. See the appendix in the reference manual for how to override the jar versions, especially if you are using Spring Boot for dependency management and/or you are using the test embedded Kafka broker.
I'm not sure... could the incompatibility described in the advert be causing this behaviour?
If you want the load to be evenly balanced between the two applications, you should set each one's concurrency to 5 - otherwise I don't see why there would be any guarantees that one app should not be assigned all partitions if they are in the same consumer group.
Also, the
kafka-clients
library has been updated to a new major version between theseSpring Kafka
versions, so that might explain the difference in behavior - AFAIKSpring Kafka
doesn’t interfere with partition assignment unless explicitly told so.That being said, I'm not sure why you'd want both versions to consume records at the same time - if there are changes in business logic you won't be able to deterministically tell which version will process which partition / record, unless you manually assign partitions for each version.
What you might try is - let's say you're deploying
green
- deploy it withautoStartup
set to false, make sure app deployment is ok, then stop the listener containers in theblue
app and start the containers in thegreen
app. Should anything go wrong, you can stop the containers ingreen
and restart theblue
ones as a rollback.You can add this procedure as part of your blue / green deployment automation, probably along load balancer routing changes if there are exposed endpoints.
Refer to this section of Spring Kafka's documentation for more information on managing the containers lifecycle.
EDIT: As per @garyrussell's comment, the issue was indeed not related to
Spring Kafka
but rather to a change in how consumers are assigned a subscription id betweenkafka-clients
versions, "with the range assignor sorting them and all the old subscriptions come first in the list".Full discussion on the matter in this GitHub Issue.