Integration pattern : how to sync processing message received from multiple systems

314 views Asked by At

I am building a system that will receive messages via a Message broker (Currently, JMS) from different systems. All the messages from all the senders systems have a deviceId and there is no order in the reception of the message. For instance, system A can send a message with deviceId=1 and system b be can send a message with deviceId=2.

My goal is not to start processing of the messages concerning the same deviceId unless I got all the message from all the senders with the same deviceId.

For example, if I have 3 systems A, B and C sending messages to my system :

System A sends messageA1 with deviceId=1
System B sends messageB1 with deviceId=1
System C sends messageC1 with deviceId=3
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1.

Should this problem be resolved by using some sync mechanism in my system , by the message broker or an integration framework like spring-integration/apache camel ?

3

There are 3 answers

0
mgyongyosi On BEST ANSWER

A similar solution with the Aggregator (what @Artem Bilan mentioned) can also be implemented in Camel with a custom AggregationStrategy and with controlling the Aggregator completion by using the Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP property.

The following might be a good starting point. (You can find the sample project with tests here)

Route:

from("direct:start")
    .log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}")
    .aggregate(header("deviceId"), new SignalAggregationStrategy(3))
    .log(LoggingLevel.INFO, "Signaled body: ${body}")
    .to("direct:result");

SignalAggregationStrategy.java

public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate {

    private int numberOfSystems;

    public SignalAggregationStrategy(int numberOfSystems) {
        this.numberOfSystems = numberOfSystems;
    }

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Exchange exchange = super.aggregate(oldExchange, newExchange);

        List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class);

        // Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different)
        // https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy
        if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) {
            exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
        }

        return exchange;
    }

    @Override
    public boolean matches(Exchange exchange) {
        // make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion)
        return false;
    }
}

Hope it helps!

0
Artem Bilan On

Spring Integration provides component for exactly this kind of tasks - do not emit until the whole group is collected. And it's name an Aggregator. Your deviceId is definitely a correlationKey. The releaseStrategy really may be based on the number of systems - how much deviceId1 messages you are waiting before proceed to the next step.

0
Souciance Eqdam Rashti On

You can do this in Apache Camel using a caching component. I think there is the EHCache component.

Essentially:

  1. You receive a message with a given deviceId say deviceId1.
  2. You look up in your cache to see which messages have been received for deviceId1.
  3. As long as you have not received all three you add the current system/message to the cache.
  4. Once all messages are there you process and clear the cache.

You could then off course route each incoming message to a specific deviceId based queue for temporary storage. This can be JMS, ActiveMQ or something similar.