How does one configure and use a Topic (publisher - subscriber model) in Red Hat AMQ 7 using AMQP 1.0 and consume in Spring Boot
I have configured the topic as follows in the Red Hat AMQ console
I have created a Spring Boot app
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
</parent>
<artifactId>amqp-1-topic-producer</artifactId>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.amqphub.spring</groupId>
<artifactId>amqp-10-jms-spring-boot-starter</artifactId>
<version>2.5.13</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
<skip>false</skip>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
With application.properties:
amqphub.amqp10jms.remote-url=amqp://localhost:61616
amqphub.amqp10jms.username=admin
amqphub.amqp10jms.password=admin
my.topic.name=mySOQueue
Creating a publisher seems to be working
@Component
public class Producer {
private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
@Autowired
public JmsTemplate jmsTemplate;
@Value("${my.topic.name}")
public String queueName;
@Scheduled(fixedDelay = 2000L)
public void sendMessages() {
sendMessage("Hello World topic");
}
public void sendMessage(final String payload) {
LOG.info("============= Sending: " + payload);
this.jmsTemplate.convertAndSend(queueName, payload);
}
}
I am not getting any exceptions with the log output
============= Sending: Hello World topic Connection ID:9231142a-a196-48bf-987a-1b5d65fc6955:96
connected to server: amqp://localhost:61616
I however cannot see the message count go up in the Red Hat AMQ console. It stays at 0.
If I try to create a consumer
@Component
public class Consumer {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
@JmsListener(destination = "jms.topic.demoTopic")
public void processMsg(final String message) {
LOG.info("Received: " + message);
}
}
I get this error
Open of resource:(JmsConsumerInfo: { ID:efcdcfd8-eb5b-4438-b4ec-a8fa2cf8e9cf:1:1:1, destination = jms.topic.demoTopic }) failed: Address jms.topic.demoTopic is not configured for queue support [condition = amqp:illegal-state]
I know if I use ApacheMQ I can create an ActiveMQTopicConnectionFactory and configure that with a public subscriber model but I have no idea how to do it with the amqp-10.



When you want your JMS application to send a message to a JMS topic then all you need to configure on the broker (assuming that auto-creation is disabled) is an address that supports multicast (as noted in the documentation). You've already done that with
mySOAddress.Next, your application should then use the name of the address when it sends and consumes messages. Currently you're using the name of the queue (i.e.
mySOQueue) when you send messages. Use this instead in yourapplication.properties:Your
@JmsListenershould also use the name of the address, e.g.:Then you need to configure your Spring application to use JMS topic (i.e. pub/sub) semantics rather than queue (i.e. peer-to-peer) semantics. For the
JmsTemplateyou can usesetPubSubDomain(true). For the@JmsListenersee this answer.Lastly, you should remove the
mySOQueuequeue.