Summarization of the problem

In my project, we are trying to use Spring-Cloud-Stream (SCS) to connect to Solace. Eventually we plan to move to Kafka. So using SCS will help us move over to Kafka quite easily without any code changes and very minimal configuration & dependency changes.

We had been using Solace for a while using JMS. Now when we tried to publish messages to Solace using SCS, we observed that in the message, some crucial JMS Headers (JMSMessageID, JMSType, JMSPriority,JMSCorrelationID, JMSExpiration) are blank.

Do we need to configure the JMS headers separately ? If yes, how ?

What I've already tried

I tried to set headers like this, but this is just resulting in duplicate headers with the same name.

    @Output(SendReport.TO_NMR)
    public void sendMessage(String request) {
        log.info("****************** Got this Report Request: " + request);

        MessageBuilder<String> builder = MessageBuilder.withPayload(request);
        builder.setHeader("JMSType","report-request");
        builder.setHeader("JMSMessageId","1");
        builder.setHeader("JMSCorrelationId","11");
        builder.setHeader("JMSMessageID","4");
        builder.setHeader("JMSCorrelationID","114");
        builder.setHeader("ApplicationMessageId","111");
        builder.setHeader("ApplicationMessageID","112");
        builder.setCorrelationId("23434");

        Message message = builder.build();
        sendReport.output().send(message);
    }

JMS Header of the message in Solace looks like this

JMSMessageID    
JMSDestination  TOPIC_NAME
JMSTimestamp    Wed Dec 31 18:00:00 CST 1969
JMSType 
JMSReplyTo  
JMSCorrelationID    
JMSExpiration   0
JMSPriority 0
JMSType nmr-report-request
JMSMessageId    1
JMSMessageID    4
_isJavaSerializedObject-contentType true
_isJavaSerializedObject-id  true
solaceSpringCloudStreamBinderVersion    0.1.0
ApplicationMessageId    111
ApplicationMessageID    112
JMSCorrelationId    11
JMSCorrelationID    114
correlationId   23434
id  [-84,-19,0,5,115,114,0,14,106,97,118,97,46,117,116,105,108,46,85,85,73,68,-68,-103,3,-9,-104,109,-123,47,2,0,2,74,0,12,108,101,97,115,116,83,105,103,66,105,116,115,74,0,11,109,111,115,116,83,105,103,66,105,116,115,120,112,13,-26,2,-51,111,-17,73,73,-18,-32,-26,-11,-46,-89,50,-37] (offset=377, length=80)
contentType [-84,-19,0,5,115,114,0,33,111,114,103,46,115,112,114,105,110,103,102,114,97,109,101,119,111,114,107,46,117,116,105,108,46,77,105,109,101,84,121,112,101,56,-76,29,-63,64,96,-36,-81,2,0,3,76,0,10,112,97,114,97,109,101,116,101,114,115,116,0,15,76,106,97,118,97,47,117,116,105,108,47,77,97,112,59,76,0,7,115,117,98,116,121,112,101,116,0,18,76,106,97,118] (offset=473, length=190)
timestamp   1555707627482

Code used to connect to Solace

Spring Boot Main Class

@SpringBootApplication
@EnableDiscoveryClient
@Slf4j
@EnableBinding({SendReport.class}) 
public class ReportServerApplication {

    public static void main(final String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext-server.xml");
        new SpringApplicationBuilder(ReportServerApplication.class).listeners(new EnvironmentPreparedListener())                                                   .run(args);
}

Class to connect channel to topic:

public interface SendReport {

    String TO_NMR = "solace-poc-outbound";

    @Output(SendReport.TO_NMR)
    MessageChannel output();

}

Message Handler:

@Slf4j
@Component
@EnableBinding({SendReport.class})
public class MessageHandler {

    private SendReport sendReport;

    public MessageHandler(SendReport sendReport){
        this.sendReport = sendReport;
    }

    @Output(SendReport.TO_NMR)
    public void sendMessage(String request) {
        log.info("****************** Got this Report Request: " + request);
        var message = MessageBuilder.withPayload(request).build();
        sendReport.output().send(message);
    }
}

Properties used for configuration : application.yml

spring:
  cloud:
    # spring cloud stream binding
    stream:
      bindings:
        solace-poc-outbound:
          destination: TOPIC_NAME
          contentType: text/plain

solace:
  java:
    host: tcp://xyz.abc.com
    #port: xxx
    msgVpn: yyy
    clientUsername: aaa

Dependencies used:

'org.springframework.cloud:spring-cloud-stream',
'com.solace.spring.cloud:spring-cloud-starter-stream-solace:1.1.+'

Observation

  • Expected result : All JMS headers should get populated by SCS.
  • Actual result : Some JMS headers are not getting populated.

1 Answers

2
Artem Bilan On

See JMS Message JavaDocs:

/** Sets the message ID.
  *  
 * <P>This method is for use by JMS providers only to set this field 
 * when a message is sent. This message cannot be used by clients 
 * to configure the message ID. This method is public
 * to allow a JMS provider to set this field when sending a message
 * whose implementation is not its own.
  *
  * @param id the ID of the message
  *
  * @exception JMSException if the JMS provider fails to set the message ID 
  *                         due to some internal error.
  *
  * @see javax.jms.Message#getJMSMessageID()
  */ 

void
setJMSMessageID(String id) throws JMSException;

So, this property cannot be populated from the application level.

In the ActiveMQ I see the code like this:

 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));

 // Set the message id.
 if (msg != message) {
        message.setJMSMessageID(msg.getMessageId().toString());

But still: it is not what we can control from the application level.

The priority, deliveryMode and timeToLive ca be populated from the JmsSendingMessageHandler:

if (this.jmsTemplate instanceof DynamicJmsTemplate && this.jmsTemplate.isExplicitQosEnabled()) {
        Integer priority = StaticMessageHeaderAccessor.getPriority(message);
        if (priority != null) {
            DynamicJmsTemplateProperties.setPriority(priority);
        }
        if (this.deliveryModeExpression != null) {
            Integer deliveryMode =
                    this.deliveryModeExpression.getValue(this.evaluationContext, message, Integer.class);

            if (deliveryMode != null) {
                DynamicJmsTemplateProperties.setDeliveryMode(deliveryMode);
            }
        }
        if (this.timeToLiveExpression != null) {
            Long timeToLive = this.timeToLiveExpression.getValue(this.evaluationContext, message, Long.class);
            if (timeToLive != null) {
                DynamicJmsTemplateProperties.setTimeToLive(timeToLive);
            }
        }
    }

The JmsCorrelationID must be populated by the JmsHeaders.CORRELATION_ID. The JmsType by the JmsHeaders.TYPE, respectively:

public void fromHeaders(MessageHeaders headers, javax.jms.Message jmsMessage) {
    try {
        Object jmsCorrelationId = headers.get(JmsHeaders.CORRELATION_ID);
        if (jmsCorrelationId instanceof Number) {
            jmsCorrelationId = jmsCorrelationId.toString();
        }
        if (jmsCorrelationId instanceof String) {
            try {
                jmsMessage.setJMSCorrelationID((String) jmsCorrelationId);
            }
            catch (Exception e) {
                this.logger.info("failed to set JMSCorrelationID, skipping", e);
            }
        }
        Object jmsReplyTo = headers.get(JmsHeaders.REPLY_TO);
        if (jmsReplyTo instanceof Destination) {
            try {
                jmsMessage.setJMSReplyTo((Destination) jmsReplyTo);
            }
            catch (Exception e) {
                this.logger.info("failed to set JMSReplyTo, skipping", e);
            }
        }
        Object jmsType = headers.get(JmsHeaders.TYPE);
        if (jmsType instanceof String) {
            try {
                jmsMessage.setJMSType((String) jmsType);
            }
            catch (Exception e) {
                this.logger.info("failed to set JMSType, skipping", e);
            }
        }

See DefaultJmsHeaderMapper for more info.