Apache Camel : RabbitMQ + SQL, how to increase processing of consumers, for my application i want to send notification to subscribers for db changes in my database, my producer is producing at a high speed of 150/s, whereas my consumers are not able to run so fast, they can only process 20/s and hence are lagging in processing
`Below is the code :
`import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.builder.PredicateBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedCaseInsensitiveMap;
import java.util.concurrent.ExecutorService;
@Component
public class MyRouteBuilder extends RouteBuilder {
@Value("${rabbitmq.host}")
String rabbitmqHost;
@Value("${rabbitmq.port}")
Integer rabbitmqPort;
@Value("${rabbitmq.username}")
String rabbitmqUsername;
@Value("${rabbitmq.password}")
String rabbitmqPassword;
@Value("${rabbitmq.queue}")
String rabbitmqQueue;
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("log:dead?level=ERROR&showProperties=true"));
ObjectMapper mapper = new ObjectMapper();
JacksonDataFormat jacksonDataFormat = new JacksonDataFormat();
jacksonDataFormat.setObjectMapper(mapper);
from("spring-rabbitmq:default?queues=" + rabbitmqQueue + "&routingKey=" + rabbitmqQueue +"&concurrentConsumers=10&maxConcurrentConsumers=10&asyncConsumer=true")
.threads().executorService("messageProcessingThreadPool")
.convertBodyTo(String.class)
.unmarshal(jacksonDataFormat)
.process(exchange -> {
LinkedCaseInsensitiveMap<String> map = exchange.getIn().getBody(LinkedCaseInsensitiveMap.class);
})
.log("Received db changes response: ${body}")
.setProperty("dbChangesTableName", simple("${body.get(table_name)}"))
.setProperty("dbChangesColumnName", simple("${body.get(column_name)}"))
.setProperty("dbChangesOldValue", simple("${body.get(old_value)}"))
.setProperty("dbChangesNewValue", simple("${body.get(new_value)}"))
.setProperty("dbChangesPropertyId", simple("${body.get(property_id)}"))
.to("sql:select w.id, w.format, w.webhook_url, w.table_name, w.column_name, wo.operation_type, " +
"wp.property_id from webhook w inner join webhook_operation wo on w.id = wo.webhook_id inner" +
" join webhook_property wp on w.id = wp.webhook_id")
.split(body())
.setProperty("webhookId", simple("${body.get(id)}"))
.setProperty("webhookFormat", simple("${body.get(format)}"))
.setProperty("webhookUrl", simple("${body.get(webhook_url)}"))
.setProperty("webhookTableName", simple("${body.get(table_name)}"))
.setProperty("webhookColumnName", simple("${body.get(column_name)}"))
.setProperty("webhookOperationType", simple("${body.get(operation_type)}"))
.setProperty("webhookPropertyId", simple("${body.get(property_id)}"))
.choice()
.when(
PredicateBuilder.and(
simple("${exchangeProperty.dbChangesPropertyId} == ${exchangeProperty.webhookPropertyId}"),
PredicateBuilder.or(
PredicateBuilder.and(
simple("${exchangeProperty.dbChangesTableName} == ${exchangeProperty.webhookTableName}"),
simple("${exchangeProperty.dbChangesColumnName} == ${exchangeProperty.webhookColumnName}")
),
PredicateBuilder.and(
simple("${exchangeProperty.webhookTableName} == 'ALL'"),
simple("${exchangeProperty.webhookColumnName} == 'ALL'")
)
)))
.setHeader("CamelHttpMethod", constant("POST"))
.setHeader(Exchange.CONTENT_TYPE, constant("application/json; charset=utf-8"))
.setBody(simple("""
{
"tableName": "${exchangeProperty.dbChangesTableName}",
"columnName": "${exchangeProperty.dbChangesColumnName}",
"oldValue": "${exchangeProperty.dbChangesOldValue}",
"propertyId": "${exchangeProperty.dbChangesPropertyId}",
"newValue": "${exchangeProperty.dbChangesNewValue}"
}"""))
.setProperty("notificationBody", simple("${body}"))
.removeHeader("Authorization")
.toD("${exchangeProperty.webhookUrl}?clientConnectionManager=#connManager&throwExceptionOnFailure=false")
.convertBodyTo(String.class)
.setProperty("notificationResponse", simple("${body}"))
.log("WebhookId : ${exchangeProperty.webhookId}, dbChangesPropertyId : ${exchangeProperty.dbChangesPropertyId}, " +
"notificationBody : ${exchangeProperty.notificationBody}, notificationResponse : ${exchangeProperty.notificationResponse}, " +
"ResponseCode : ${header.CamelHttpResponseCode}")
.otherwise()
.end();
from("sql:select id,table_name, column_name, property_id, dtime, old_value, new_value, processed, act from public.db_changes where processed = 0 and property_id is not null")
.routeId("dbChangesRoute")
.log("Processing record..${body.get(id)}")
.setProperty("change_id", simple("${body.get(id)}"))
.marshal(jacksonDataFormat)
.convertBodyTo(byte[].class)
.to("spring-rabbitmq:default?routingKey=" + rabbitmqQueue)
.to("sql:update public.db_changes set processed = 1 where id=:#${exchangeProperty.change_id}")
.onException(Exception.class)
.handled(true)
.log("${exception} ${exception.stacktrace} occurred while processing ${body}")
.end();
from("direct:threadPoolSetup")
.setBody(constant("Initialize thread pool"))
.log("Initializing thread pool...")
.to("log:ThreadPool")
.bean("myRouteBuilder", "initializeThreadPool")
.log("Thread pool initialized");
from("timer://threadPoolTimer?repeatCount=1")
.routeId("threadPoolSetup")
.to("direct:threadPoolSetup");
}
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitmqHost);
connectionFactory.setPort(rabbitmqPort);
connectionFactory.setUsername(rabbitmqUsername);
connectionFactory.setPassword(rabbitmqPassword);
return connectionFactory;
}
@Bean("connManager")
PoolingHttpClientConnectionManager poolingHttpClientConnectionManager() {
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
connManager.setMaxTotal(10);
return connManager;
}
@Bean("messageProcessingThreadPool")
public ExecutorService initializeThreadPool() {
int threadPoolSize = 100;
return getContext().getExecutorServiceManager().newFixedThreadPool(this, "messageProcessingThreadPool", threadPoolSize);
}
}``
PS : i am new to apache camel, hence there can be lot of improvements in code, please suggest some solution so that i can increase my consumers speed