rocketmq: 4.9.6 rocketmq-java-sdk: 4.9.6 jdk: 1.8
Using DefaultLitePullConsumer, there are more than 10,000 messages in the queue that have not been consumed. In the case of setPullBatchSize(500), the number of messages obtained is 32.
` String group = "group_test";
String topic = "topic_test";
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
DefaultLitePullConsumerImpl impl;
void fillImpl() throws NoSuchFieldException, IllegalAccessException {
Class<DefaultLitePullConsumer> cls = DefaultLitePullConsumer.class;
Field field = cls.getDeclaredField("defaultLitePullConsumerImpl");
field.setAccessible(true);
impl = (DefaultLitePullConsumerImpl) field.get(consumer);
}
long maxOffset(MessageQueue queue) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Class<DefaultLitePullConsumerImpl> cls = DefaultLitePullConsumerImpl.class;
Method method = cls.getDeclaredMethod("maxOffset", MessageQueue.class);
method.setAccessible(true);
return (long) method.invoke(impl, queue);
}
@Test
void test() throws MQClientException, NoSuchFieldException, IllegalAccessException, InvocationTargetException,
NoSuchMethodException {
fillImpl();
consumer.setNamesrvAddr("192.168.1.10:9876");
consumer.setConsumerGroup(group);
consumer.subscribe(topic, (String) null);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setPullBatchSize(500);
consumer.setPullThreadNums(10);
consumer.start();
boolean allowRun = true;
while (allowRun) {
long diff = diff();
List<MessageExt> polled = consumer.poll();
System.out.printf("diff: %d; size: %d%n", diff, polled.size());
if (polled.size() > 32) {
allowRun = false;
}
}
assertFalse(allowRun);
}
public long diff()
throws MQClientException, InvocationTargetException, NoSuchMethodException, IllegalAccessException {
Collection<MessageQueue> queues = consumer.fetchMessageQueues(topic);
long diff = 0;
for (MessageQueue queue : queues) {
OffsetStore store = consumer.getOffsetStore();
// 当前偏移量
long offset = store.readOffset(queue, ReadOffsetType.READ_FROM_MEMORY);
// 最大偏移量
long max = maxOffset(queue);
diff += max - offset;
}
return diff;
}
`
I tried setting setPullBatchSize, setPullThreadNums and consumerManageThreadPoolNums=6400, defaultQueryMaxNum=8400 in rocketmq server broke.conf. All changes did not make the message size of a single pull larger than 32
edit broker.config
maxTransferBytesOnMessageInMemory=2097152
maxTransferCountOnMessageInMemory=256
maxTransferBytesOnMessageInDisk=262144
maxTransferCountOnMessageInDisk=64