Am I using Apache Camel aggregator correctly?

1.8k views Asked by At

My understanding of a route (in Apache Camel verbiage) is that it represents a flow of data from one endpoint to another, and that it will stop at various processors along the way that perform EIP-type operations on the data.

If that's a correct/fair assessment of a route, then I am modeling a problem that I believe requires several routes inside the same CamelContext (I'm using a Spring):

  1. Route 1: Extracts data from Source-1, processes it, converts it to a List<SomePOJO> and then sends it to an aggregator
  2. Route 2: Extracts data from Source-2, processes it, also converts it to a List<SomePOJO> and then sends it to an aggregator
  3. Route 3: Contains an aggregator that waits until it receives a List<SomePOJO> from both Route 1 and Route 2, and then continues processing an aggregated list

Here's the thing: both List<SomePOJO>s need to arrive at the aggregator at the same time, or rather, the aggregator bean has to wait until it's received data from both routes before it can aggregate the 2 lists into a single List<SomePOJO> and send the aggregated list off to the rest of Route 3.

So far I have the following pseudo-coded <camelContext>:

<camelContext id="my-routes" xmlns="http://camel.apache.org/schema/spring">
    <!-- Route 1 -->
    <route id="route-1">
        <from uri="time://runOnce?repeatCount=1&amp;delay=10" />

        <!-- Extracts data from Source 1, processes it, and then produces a List<SomePOJO>. -->
        <to uri="bean:extractor1?method=process" />

        <!-- Send to aggregator. -->
        <to uri="direct:aggregator" />
    </route>

    <!-- Route 2 -->
    <route id="route-2">
        <from uri="time://runOnce?repeatCount=1&amp;delay=10" />

        <!-- Extracts data from Source 2, processes it, and then produces a List<SomePOJO>. -->
        <to uri="bean:extractor2?method=process" />

        <!-- Send to aggregator. -->
        <to uri="direct:aggregator" />
    </route>

    <!-- Route 3 -->
    <route id="route-3">
        <from uri="direct:aggregator" />

        <aggregate strategyRef="listAggregatorStrategy">
            <correlationExpression>
                <!-- Haven't figured this part out yet. -->
            </correlationExpression>
            <to uri="bean:lastProcessor?method=process" />
        </aggregate>
    </route>
</camelContext>

<bean id="listAggregatorStrategy" class="com.myapp.ListAggregatorStrategy" />

Then in Java:

public class ListAggregatorStrategy implements AggregatoryStrategy {
    public Exchange aggregate(Exchange exchange) {
        List<SomePOJO> route1POJOs = extractRoute1POJOs(exchange);
        List<SomePOJO> route2POJOs = extractRoute2POJOs(exchange);

        List<SomePOJO> aggregateList = new ArrayList<SomePOJO>(route1POJOs);
        aggregateList.addAll(route2POJOs);

        return aggregateList;
    }
}

My questions

  1. Is my basic setup correct? In other words, am I using the direct:aggregator endpoint correctly to send data out of route-1 and route-2 and into route-3's aggregator?
  2. Will my aggregator work the way I am expecting it to here? Say the extractor1 bean in route-1 takes only 5 seconds to run, but the extractor2 bean in route-2 take 2 minutes to run. At t=5, the aggregator should receive the data from extractor1 and start waiting (for 2 mins) until extractor2 finishes and gives it the rest of the data to aggregate. Yes?
1

There are 1 answers

0
bgossit On

Sounds like you are on the right track, the Aggregator page has a lot of good information about this.

The <correlationExpression> is the key to matching an Exchange from each route and completionSize can specify how many to wait for. In your case it looks like each route is only designed to run once, in which case the expression could be using a fixed header value from each Exchange, otherwise you would need something like a counter class for each route.

Here is an update to your example:

<route id="route-1">
    <from uri="time://runOnce?repeatCount=1&amp;delay=10" />
    <to uri="bean:extractor1?method=process" />
    <setHeader headerName="id">
        <constant>myHeaderValue</constant>
    </setHeader>
    <to uri="direct:aggregator" />
</route>

<route id="route-2">
    <from uri="time://runOnce?repeatCount=1&amp;delay=10" />
    <to uri="bean:extractor2?method=process" />
    <setHeader headerName="id">
        <constant>myHeaderValue</constant>
    </setHeader>
    <to uri="direct:aggregator" />
</route>

<route id="route-3">
    <from uri="direct:aggregator" />

    <aggregate strategyRef="listAggregatorStrategy" completionSize="2">
        <correlationExpression>
            <simple>header.id</simple>
        </correlationExpression>
        <to uri="bean:lastProcessor?method=process" />
    </aggregate>
</route>