I'm trying to test parallel consumption from 3-paritition topic in kafka. I assumed that all consumers should be able to subscribe and process messages, but I get the exception:
java.lang.IllegalStateException: Failed to be assigned partitions from the embedded topics
and before exception this is what I see (a lot lines with this):
[ | common-group] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-common-group-1, groupId=common-group] Request joining group due to: group is already rebalancing
[ | common-group] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-common-group-1, groupId=common-group] Request joining group due to: group is already rebalancing
Here is the code:
@SpringBootTest(classes = StreamApp.class)
@EmbeddedKafka(partitions = 3,
topics = {
"${kafka-demo.topics.input.name}",
"${kafka-demo.topics.output.name}"
},
brokerProperties = {
"transaction.state.log.replication.factor=1",
"offsets.topic.replication.factor=1",
"transaction.state.log.min.isr=1"
})
public class ScalingIT {
@Autowired
private KafkaTemplate<Object, Object> template;
@Autowired
private EmbeddedKafkaBroker broker;
@Autowired
private JsonSerde<Message> messageSerde;
private List<Consumer<String, Message>> messageConsumers = new ArrayList<>();
@Autowired
private KafkaDemoProps props;
@BeforeEach
void setup() {
this.messageConsumers.add(consumer(props.topics().output().name(), Serdes.String(), messageSerde));
this.messageConsumers.add(consumer(props.topics().output().name(), Serdes.String(), messageSerde));
this.messageConsumers.add(consumer(props.topics().output().name(), Serdes.String(), messageSerde));
}
@AfterEach
public void teardown() {
messageConsumers.forEach(c -> {
if (c != null) {
c.close();
}
});
}
private <K, V> Consumer<K, V> consumer(String topic, Serde<K> keySerde, Serde<V> valueSerde) {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps("common-group", "false", this.broker);
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
DefaultKafkaConsumerFactory<K, V> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps, keySerde.deserializer(), valueSerde.deserializer());
Consumer<K, V> consumer = kafkaConsumerFactory.createConsumer();
this.broker.consumeFromAnEmbeddedTopic(consumer, topic);
return consumer;
}
@Test
@DirtiesContext
void allSentMessagesGoVia3Consumers() {
int numRecords = 100;
IntStream.range(0,numRecords).forEach(i -> {
template.send(props.topics().input().name(), String.format("{\"SomeProp\":\"%s\"}", i).getBytes());
});
ConsumerRecords<String, Message> records1 = KafkaTestUtils.getRecords(this.messageConsumers.get(0));
ConsumerRecords<String, Message> records2 = KafkaTestUtils.getRecords(this.messageConsumers.get(1));
ConsumerRecords<String, Message> records3 = KafkaTestUtils.getRecords(this.messageConsumers.get(2));
List<ConsumerRecord<String, Message>> allRecords = new ArrayList<>();
records1.forEach(allRecords::add);
records2.forEach(allRecords::add);
records3.forEach(allRecords::add);
assertThat(allRecords.size()).isEqualTo(numRecords);
}
}
Anyone has any idea how to test multiple consumers in the same group?
It wasn't designed to be used that way; the problem is, when the second consumer polls, the first consumer needs to call
poll()for the rebalance to complete.It was only intended to support a single consumer.
You should be able to use manual assignment instead of group management.
Replace
with
You will also need
on the embedded broker so that the auto configured template will send the records there.
EDIT
I also changed your test to send a deterministic number of records to each partition and specifically to wait for that number to arrive...