Camel Multi-threaded Consumer

1.6k views Asked by At

I have a DB with orders, each with a due date and a creation date. I want to pull a maximum of 4 orders into route and process them simultaneously. Each order may take between 10-20 minutes to process. But I would like to keep all threads going as much as possible, not having any downtime.

Here is what I have now:

from("timer://GetOrder?fixedRate=true&period=1s")
            .to("bean:orderInfoDao?method=getNextOrder")
            .to("jms://process-orders")
            .end();

from("jms://process-orders?concurrentConsumers=4")
            .to("bean:orderService?method=processOrder(${body})")
            .to("direct:send-result")
            .end();

The getNextOrder DAO function returns the oldest order by creation date, which has passed its due date. Incoming order are attempted immediately.

Right now, the issue is that incoming orders pile up in the JMS route as a result of the timer and when getNextOrder returns a much older order, it is way behind in the queue.

Any ideas how I can structure these routes so that the DB is polled for the oldest 4 orders and those are executed simultaneously? Changes to the DAO are acceptable.

Is there any kind of multi-threaded producer?

Thanks in advance for suggestions!

1

There are 1 answers

5
Dragan Bozanovic On BEST ANSWER
final Semaphore semaphore = new Semaphore(4); 

from("timer://GetOrder?period=1s")
            .to("bean:orderInfoDao?method=getNextOrder")
            .to("jms://process-orders")
            .process(new Processor() {
                 public void process(Exchange exchange) {
                     semaphore.acquire();
                 }
             })
            .end();

from("jms://process-orders?concurrentConsumers=4")
            .to("bean:orderService?method=processOrder(${body})")
            .process(new Processor() {
                 public void process(Exchange exchange) {
                     semaphore.release();
                 }
             })
            .to("direct:send-result")
            .end();

Notice that timer fixedRate is off (default).

This is the first idea that came to my mind, I hope that there are some Camel EIPs which can help to implement this logic in a better way.