I have 4 consumers consuming data for a topic using a consumer group id. Out of 4 consumers, 2 of them getting data of only one partition though they are assigned to multiple partitions, and 2 of them getting data of all the partitions assigned to that consumer.
How to make the consumers get data of all the partitions assigned to it?
I am using fs2-kafka.
val brokers = "broker1:9094,broker2:9094,broker3:9094,broker4:9094"
val consumerSettings = ConsumerSettings[F, String, Array[Byte]]
.withEnableAutoCommit(false)
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withMaxPollInterval(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
.withPollTimeout(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
.withSessionTimeout(new FiniteDuration(30000, TimeUnit.MILLISECONDS))
.withHeartbeatInterval(new FiniteDuration(10000, TimeUnit.MILLISECONDS))
.withCommitTimeout(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
.withBootstrapServers(brokers)
.withClientId(UUID.randomUUID().toString)
.withGroupId("testConsumerGroupId")
def processChunks(
data: fs2.Chunk[CommittableConsumerRecord[F, K, V]]
)(implicit ce: ConcurrentEffect[F]): fs2.Stream[F, CommittableOffset[F]] =
for {
_ <- fs2.Stream.eval(handleChunkOfWork(data.map(_.record).toList))
offset <- fs2.Stream.chunk(data).map(_.offset)
} yield offset
def commitBatches(implicit ce: ConcurrentEffect[F]): Pipe[F, CommittableOffset[F], Unit] =
_.chunks
.evalMap { chunk =>
for {
_ <- CommittableOffsetBatch.fromFoldable(chunk).commit
} yield ()
}
consumerStream[F]
.using(kafkaConsumerSettings.consumerSettings)
.evalTap(_.subscribe("topic1"))
.flatMap(_.partitionedStream)
.flatMap { partionedStream =>
partionedStream.chunkLimit(streamConfig.batchSize).map(processChunks).map(commitBatches)
}
.parJoinUnbounded
.compile
.drain