How to Increase RabbitMQ + SQL Consumer Processing Speed in Apache Camel?

40 views Asked by At

Apache Camel : RabbitMQ + SQL, how to increase processing of consumers, for my application i want to send notification to subscribers for db changes in my database, my producer is producing at a high speed of 150/s, whereas my consumers are not able to run so fast, they can only process 20/s and hence are lagging in processing

`Below is the code :

`import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.builder.PredicateBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedCaseInsensitiveMap;

import java.util.concurrent.ExecutorService;

@Component
public class MyRouteBuilder extends RouteBuilder {

    @Value("${rabbitmq.host}")
    String rabbitmqHost;

    @Value("${rabbitmq.port}")
    Integer rabbitmqPort;

    @Value("${rabbitmq.username}")
    String rabbitmqUsername;

    @Value("${rabbitmq.password}")
    String rabbitmqPassword;

    @Value("${rabbitmq.queue}")
    String rabbitmqQueue;

    @Override
    public void configure() throws Exception {

        errorHandler(deadLetterChannel("log:dead?level=ERROR&showProperties=true"));

        ObjectMapper mapper = new ObjectMapper();
        JacksonDataFormat jacksonDataFormat = new JacksonDataFormat();
        jacksonDataFormat.setObjectMapper(mapper);

        from("spring-rabbitmq:default?queues=" + rabbitmqQueue + "&routingKey=" + rabbitmqQueue     +"&concurrentConsumers=10&maxConcurrentConsumers=10&asyncConsumer=true")
                .threads().executorService("messageProcessingThreadPool")
                .convertBodyTo(String.class)
                .unmarshal(jacksonDataFormat)
                .process(exchange -> {
                    LinkedCaseInsensitiveMap<String> map = exchange.getIn().getBody(LinkedCaseInsensitiveMap.class);
                })
                .log("Received db changes response: ${body}")
                .setProperty("dbChangesTableName", simple("${body.get(table_name)}"))
                .setProperty("dbChangesColumnName", simple("${body.get(column_name)}"))
                .setProperty("dbChangesOldValue", simple("${body.get(old_value)}"))
                .setProperty("dbChangesNewValue", simple("${body.get(new_value)}"))
                .setProperty("dbChangesPropertyId", simple("${body.get(property_id)}"))

                .to("sql:select w.id, w.format, w.webhook_url, w.table_name, w.column_name, wo.operation_type, " +
                        "wp.property_id from webhook w inner join webhook_operation wo on w.id = wo.webhook_id inner" +
                        " join webhook_property wp on w.id = wp.webhook_id")
                .split(body())
                .setProperty("webhookId", simple("${body.get(id)}"))
                .setProperty("webhookFormat", simple("${body.get(format)}"))
                .setProperty("webhookUrl", simple("${body.get(webhook_url)}"))
                .setProperty("webhookTableName", simple("${body.get(table_name)}"))
                .setProperty("webhookColumnName", simple("${body.get(column_name)}"))
                .setProperty("webhookOperationType", simple("${body.get(operation_type)}"))
                .setProperty("webhookPropertyId", simple("${body.get(property_id)}"))
                .choice()
                .when(
                        PredicateBuilder.and(
                                simple("${exchangeProperty.dbChangesPropertyId} == ${exchangeProperty.webhookPropertyId}"),
                                PredicateBuilder.or(
                                        PredicateBuilder.and(
                                                simple("${exchangeProperty.dbChangesTableName} == ${exchangeProperty.webhookTableName}"),
                                                simple("${exchangeProperty.dbChangesColumnName} == ${exchangeProperty.webhookColumnName}")
                                        ),
                                        PredicateBuilder.and(
                                                simple("${exchangeProperty.webhookTableName} == 'ALL'"),
                                                simple("${exchangeProperty.webhookColumnName} == 'ALL'")
                                        )
                                )))
                .setHeader("CamelHttpMethod", constant("POST"))
                .setHeader(Exchange.CONTENT_TYPE, constant("application/json; charset=utf-8"))
                .setBody(simple("""
                        {
                          "tableName": "${exchangeProperty.dbChangesTableName}",
                          "columnName": "${exchangeProperty.dbChangesColumnName}",
                          "oldValue": "${exchangeProperty.dbChangesOldValue}",
                          "propertyId": "${exchangeProperty.dbChangesPropertyId}",
                          "newValue": "${exchangeProperty.dbChangesNewValue}"
                        }"""))

                .setProperty("notificationBody", simple("${body}"))
                .removeHeader("Authorization")
                .toD("${exchangeProperty.webhookUrl}?clientConnectionManager=#connManager&throwExceptionOnFailure=false")
                .convertBodyTo(String.class)
                .setProperty("notificationResponse", simple("${body}"))
                .log("WebhookId : ${exchangeProperty.webhookId}, dbChangesPropertyId : ${exchangeProperty.dbChangesPropertyId}, " +
                        "notificationBody : ${exchangeProperty.notificationBody}, notificationResponse : ${exchangeProperty.notificationResponse}, " +
                        "ResponseCode : ${header.CamelHttpResponseCode}")
                .otherwise()
                .end();

        from("sql:select id,table_name, column_name, property_id, dtime, old_value, new_value, processed, act from public.db_changes where processed = 0 and property_id is not null")
                .routeId("dbChangesRoute")
                .log("Processing record..${body.get(id)}")
                .setProperty("change_id", simple("${body.get(id)}"))
                .marshal(jacksonDataFormat)
                .convertBodyTo(byte[].class)
                .to("spring-rabbitmq:default?routingKey=" + rabbitmqQueue)
                .to("sql:update public.db_changes set processed = 1 where id=:#${exchangeProperty.change_id}")
                .onException(Exception.class)
                .handled(true)
                .log("${exception} ${exception.stacktrace} occurred while processing ${body}")
                .end();


        from("direct:threadPoolSetup")
                .setBody(constant("Initialize thread pool"))
                .log("Initializing thread pool...")
                .to("log:ThreadPool")
                .bean("myRouteBuilder", "initializeThreadPool")
                .log("Thread pool initialized");


        from("timer://threadPoolTimer?repeatCount=1")
                .routeId("threadPoolSetup")
                .to("direct:threadPoolSetup");

    }

    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(rabbitmqPort);
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        return connectionFactory;
    }

    @Bean("connManager")
    PoolingHttpClientConnectionManager poolingHttpClientConnectionManager() {
        PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
        connManager.setMaxTotal(10);
        return connManager;
    }


    @Bean("messageProcessingThreadPool")
    public ExecutorService initializeThreadPool() {
        int threadPoolSize = 100; 
        return getContext().getExecutorServiceManager().newFixedThreadPool(this, "messageProcessingThreadPool", threadPoolSize);
    }

}``

PS : i am new to apache camel, hence there can be lot of improvements in code, please suggest some solution so that i can increase my consumers speed

0

There are 0 answers