My understanding of a route (in Apache Camel verbiage) is that it represents a flow of data from one endpoint to another, and that it will stop at various processors along the way that perform EIP-type operations on the data.
If that's a correct/fair assessment of a route, then I am modeling a problem that I believe requires several routes inside the same CamelContext
(I'm using a Spring):
- Route 1: Extracts data from Source-1, processes it, converts it to a
List<SomePOJO>
and then sends it to an aggregator - Route 2: Extracts data from Source-2, processes it, also converts it to a
List<SomePOJO>
and then sends it to an aggregator - Route 3: Contains an aggregator that waits until it receives a
List<SomePOJO>
from both Route 1 and Route 2, and then continues processing an aggregated list
Here's the thing: both List<SomePOJO>
s need to arrive at the aggregator at the same time, or rather, the aggregator bean has to wait until it's received data from both routes before it can aggregate the 2 lists into a single List<SomePOJO>
and send the aggregated list off to the rest of Route 3.
So far I have the following pseudo-coded <camelContext>
:
<camelContext id="my-routes" xmlns="http://camel.apache.org/schema/spring">
<!-- Route 1 -->
<route id="route-1">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<!-- Extracts data from Source 1, processes it, and then produces a List<SomePOJO>. -->
<to uri="bean:extractor1?method=process" />
<!-- Send to aggregator. -->
<to uri="direct:aggregator" />
</route>
<!-- Route 2 -->
<route id="route-2">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<!-- Extracts data from Source 2, processes it, and then produces a List<SomePOJO>. -->
<to uri="bean:extractor2?method=process" />
<!-- Send to aggregator. -->
<to uri="direct:aggregator" />
</route>
<!-- Route 3 -->
<route id="route-3">
<from uri="direct:aggregator" />
<aggregate strategyRef="listAggregatorStrategy">
<correlationExpression>
<!-- Haven't figured this part out yet. -->
</correlationExpression>
<to uri="bean:lastProcessor?method=process" />
</aggregate>
</route>
</camelContext>
<bean id="listAggregatorStrategy" class="com.myapp.ListAggregatorStrategy" />
Then in Java:
public class ListAggregatorStrategy implements AggregatoryStrategy {
public Exchange aggregate(Exchange exchange) {
List<SomePOJO> route1POJOs = extractRoute1POJOs(exchange);
List<SomePOJO> route2POJOs = extractRoute2POJOs(exchange);
List<SomePOJO> aggregateList = new ArrayList<SomePOJO>(route1POJOs);
aggregateList.addAll(route2POJOs);
return aggregateList;
}
}
My questions
- Is my basic setup correct? In other words, am I using the
direct:aggregator
endpoint correctly to send data out ofroute-1
androute-2
and intoroute-3
's aggregator? - Will my aggregator work the way I am expecting it to here? Say the
extractor1
bean inroute-1
takes only 5 seconds to run, but theextractor2
bean inroute-2
take 2 minutes to run. At t=5, the aggregator should receive the data fromextractor1
and start waiting (for 2 mins) untilextractor2
finishes and gives it the rest of the data to aggregate. Yes?
Sounds like you are on the right track, the Aggregator page has a lot of good information about this.
The
<correlationExpression>
is the key to matching an Exchange from each route and completionSize can specify how many to wait for. In your case it looks like each route is only designed to run once, in which case the expression could be using a fixed header value from each Exchange, otherwise you would need something like a counter class for each route.Here is an update to your example: