As demonstrated in the diagram, out of n messages published on the queue - /queue/msgs
, the distribution among the consumers (STOMP-Consumer 1
and STOMP-Consumer 2
) is uneven. I could observe, STOMP-Consumer 2
only received one message out of n messages.
Exact same STOMP headers are passed by both the consumers. Those are as follows -
STOMP CONNECT Headers
- client-id: app
STOMP SUBSCRIBE Headers
- durable-subscription-name: app-subscription
- auto-delete: false
- ack: client-individual
- destination: /queue/msgs
Broker.xml
<?xml version="1.0"?>
<configuration
xmlns="urn:activemq"
xmlns:xi="http://www.w3.org/2001/XInclude"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core
xmlns="urn:activemq:core" xsi:schemaLocation="urn:activemq:core ">
<name>activemq-558f6696fc-2kx8q</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>ASYNCIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>24000</journal-buffer-timeout>
<journal-max-io>4096</journal-max-io>
<disk-scan-period>5000</disk-scan-period>
<max-disk-usage>90</max-disk-usage>
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>368000</page-sync-timeout>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<message-expiry-scan-period>30000</message-expiry-scan-period>
<connection-ttl-override>60000</connection-ttl-override>
<address-settings>
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<address-setting match="/queue/#">
<default-address-routing-type>ANYCAST</default-address-routing-type>
<default-queue-routing-type>ANYCAST</default-queue-routing-type>
</address-setting>
<address-setting match="/topic/#">
<default-address-routing-type>MULTICAST</default-address-routing-type>
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
</address-setting>
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-jms-queues>false</auto-delete-jms-queues>
<auto-delete-jms-topics>false</auto-delete-jms-topics>
<auto-delete-addresses>false</auto-delete-addresses>
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
<redelivery-delay-multiplier>1</redelivery-delay-multiplier>
<redelivery-collision-avoidance-factor>0.15</redelivery-collision-avoidance-factor>
<max-redelivery-delay>50000</max-redelivery-delay>
<default-consumer-window-size>0</default-consumer-window-size>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ"/>
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue"/>
</anycast>
</address>
</addresses>
<wildcard-addresses>
<routing-enabled>true</routing-enabled>
<delimiter>/</delimiter>
<any-words>#</any-words>
<single-word>*</single-word>
</wildcard-addresses>
</core>
</configuration>
The acceptor in use is the artemis acceptor with port 61616
Use the
acceptor
URL parameterstompConsumerCredits
. The default value for this is10240
bytes which means that the broker will dispatch 10KB worth of messages to each client. If the first client processes these messages quickly enough then other consumers can starve. Assuming that your STOMP consumers are usingstomp
acceptor
then you could try something like this:Keep in mind that dispatching messages to consumers in batches based on
stompConsumerCredits
is a performance optimization which prevents lots of network round-trips between the client and the broker to fetch messages. I recommend you test different values forstompConsumerCredits
to find the optimum performance for your use-case as using a small value for 2 consumers may actually reduce overall message throughput vs. a large value for 1 consumer.Also, it's worth noting that in ActiveMQ Artemis 2.16.0 (not yet released) you'll be able to use a custom header (i.e.
consumer-window-size
) on the STOMPSUBSCRIBE
frame to tune this per client instead of having it set at theacceptor
level for all consumers.