Some consumers gets the for only one partitions and some consumers data of all partitions for same consumer group Id

218 views Asked by At

I have 4 consumers consuming data for a topic using a consumer group id. Out of 4 consumers, 2 of them getting data of only one partition though they are assigned to multiple partitions, and 2 of them getting data of all the partitions assigned to that consumer.

How to make the consumers get data of all the partitions assigned to it?

I am using fs2-kafka.

val brokers = "broker1:9094,broker2:9094,broker3:9094,broker4:9094"

val consumerSettings = ConsumerSettings[F, String, Array[Byte]]
      .withEnableAutoCommit(false)
      .withAutoOffsetReset(AutoOffsetReset.Earliest)
      .withMaxPollInterval(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
      .withPollTimeout(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
      .withSessionTimeout(new FiniteDuration(30000, TimeUnit.MILLISECONDS))
      .withHeartbeatInterval(new FiniteDuration(10000, TimeUnit.MILLISECONDS))
      .withCommitTimeout(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
      .withBootstrapServers(brokers)
      .withClientId(UUID.randomUUID().toString)
      .withGroupId("testConsumerGroupId")

 def processChunks(
      data: fs2.Chunk[CommittableConsumerRecord[F, K, V]]
  )(implicit ce: ConcurrentEffect[F]): fs2.Stream[F, CommittableOffset[F]] =
    for {
      _ <- fs2.Stream.eval(handleChunkOfWork(data.map(_.record).toList))
      offset <- fs2.Stream.chunk(data).map(_.offset)
    } yield offset

 def commitBatches(implicit ce: ConcurrentEffect[F]): Pipe[F, CommittableOffset[F], Unit] =
    _.chunks
      .evalMap { chunk =>
        for {
          _ <- CommittableOffsetBatch.fromFoldable(chunk).commit
        } yield ()
      }

consumerStream[F]
  .using(kafkaConsumerSettings.consumerSettings)
  .evalTap(_.subscribe("topic1"))
  .flatMap(_.partitionedStream)
  .flatMap { partionedStream =>
    partionedStream.chunkLimit(streamConfig.batchSize).map(processChunks).map(commitBatches)
  }
  .parJoinUnbounded
  .compile
  .drain
0

There are 0 answers