Camel JMS request-reply with 'n' reply messages

921 views Asked by At

I am using Camel JMS component for request-reply for communication with MQ. For some of my requests I can receive n messages in reply. How can I aggregate these reply messages?

I thought of using aggregator pattern with aggregation strategy, but can't use it as I am not sure on number of messages which can come in reply.

Can community help me understand what's the right way to do it? I did some google search but couldn't find something useful. Below is my sample route code

from("direct:"+routeName).routeId(routeName)
                        .setHeader("JMSCorrelationID", constant(UUID.randomUUID().toString()))
                        .circuitBreaker()
                            .resilience4jConfiguration()
                            .minimumNumberOfCalls(3)
                        .end()
                        .to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&replyTo=CAMELDEMOREPLY")
                            .log("${body}")
                            .unmarshal(customerDetailsOutBound)
                            .process(new Processor() {
                                    @Override
                                    public void process(Exchange exchange) throws Exception {
                                        System.out.println(exchange.getIn().getBody().toString());
                                    }
                            })
                        .onFallback().process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                System.out.println("Store this message to backup");
                            }
                        })
                        .end();

Looking forward to get some good insights from community. Thank you.

3

There are 3 answers

3
burki On

Well, traditional request-reply has by design just 1 reply message. The thread waiting for the response stops listening as soon as the first reply arrives.

With JMS correlation IDs (no dedicated thread per request) it would theoretically be possible to receive multiple replies for the same request, but I don't know if this really works/is allowed in JMS.

Update based on comments

You write in comments that you are able to receive multiple JMS replies for one request and that you even get the number of answers to expect.

If this all works, you can use the Aggregator EIP in your Camel route to collect all responses before sending a reply to the caller.

The Aggregator is highly configurable. You can decide how to combine the responses and you can also define multiple completion criteria (timeout, number of messages etc).

7
Kavithakaran Kanapathippillai On

Message flow

  1. your first route sends a message to CAMELDEMO queue and start waiting for a single aggreagted message on a new queue CAMELDEMO_AGGREGATED_REPLY
  2. component that received the message on CAMELDEMO, start sending responses to CAMELDEMOREPLY queue and also indicates how many responses will be sent
  3. Second route below starts listening on CAMELDEMOREPLY, aggregates the message and send the aggregated message to CAMELDEMO_AGGREGATED_REPLY.
  4. Your first route that was waiting for the reply on CAMELDEMO_AGGREGATED_REPLY gets the aggregated reply, receives single message and sends it back

Original route updated to await for reply on CAMELDEMO_AGGREGATED_REPLY

...
.to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&
                replyTo=CAMELDEMO_AGGREGATED_REPLY")
.log("${body}")
.unmarshal(customerDetailsOutBound)
.process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            System.out.println(exchange.getIn().getBody().toString());
        }
})
....

Second route to aggregate the messages

from(mqComponentBeanName+"://CAMELDEMOREPLY?
                          exchangePattern=In&requestTimeout=10000)
.aggregate(header("JMSCorrelationID"), new MyAggregationStrategy())
.to(mqComponentBeanName+"://CAMELDEMO_AGGREGATED_REPLY?
                          exchangePattern=Out&requestTimeout=10000)
public final class MyCompletionStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExch, Exchange newExchange) 
    {
        ...
        //Here you check your flag regarding the number of responses
        // you were supposed to receive, and if it is met
        // complete the aggregation by setting it to true
        oldExch.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
                ...
         return oldExchange;
     }
}
0
gomzee On

I was able to solve this with single route. Solution may not be that neat, but works and fulfils the purpose. I have used loopDoWhile and in the processor inside loopDoWhile I am fetching message from queue using plain java code.

from("direct:"+routeName).routeId(routeName)
                    .setHeader("JMSCorrelationID", constant(UUID.randomUUID().toString()))
                    .circuitBreaker()
                        .resilience4jConfiguration()
                        .minimumNumberOfCalls(3)
                    .end()
                    .to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&replyTo=CAMELDEMOREPLY")
                        .log("${body}")
                        .unmarshal(customerDetailsOutBound)
                        .process(new Processor() {
                                @Override
                                public void process(Exchange exchange) throws Exception {
                                    System.out.println(exchange.getIn().getBody().toString());


int msgCount = getMsgCountfromFirstReposnse;
if (msgCount > 1) {
exchange.getIn().setHeader("COUNTER", 0);
exchange.getIn().setHeader("MSG_COUNT", msgCount-1);
exchange.setProperty("connectionFactory", connectionFactory);
}
                                }
                        })
                    .loopDoWhile(simple("${headers.COUNTER} != ${headers.MSG_COUNT}"))
                            .process(simpleJMSConsumerProcess)
                        .end().endCircuitBreaker()
                    .onFallback().process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            System.out.println("Store this message to backup");
                        }
                    })

Code inside processor:

ConnectionFactory connectionFactory = (ConnectionFactory) exchange.getProperty("connectionFactory");
    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);

    try {
        Queue queue = session.createQueue("CAMELDEMOREPLY?consumer.priority=10");
        MessageConsumer consumer = session.createConsumer(queue, "JMSCorrelationID = '"+exchange.getIn().getHeader("JMSCorrelationID").toString()+"'");
        connection.start();
        TextMessage textMsg = (TextMessage) consumer.receive();
        System.out.println(textMsg);
        System.out.println("Received: " + textMsg.getText());
        exchange.getIn().setHeader("COUNTER", ((Integer)exchange.getIn().getHeader("COUNTER"))+1);
        if (connection != null) {
            connection.close();
        }
    } finally {
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }