Stale Messages in INT_MESSAGE table Spring Integration Aggregator

141 views Asked by At

we have been using spring integration aggregator (version 5.1.4.RELEASE) in our project with the following configuration

<int:aggregator id="adobeJdbcAggregator" input-channel="aggregatorInputChannel" 
                                             output-channel="aggregatorOutputChannel"
                                             correlation-strategy-expression="headers['eventType']" 
                                             message-store="ccsJdbcMessageStore" 
                                             send-partial-result-on-expiry="true" 
                                              group-timeout-expression="#{aggregateGroupTimeoutValue}"
                                             expire-groups-upon-completion="true"
                                             expire-groups-upon-timeout="true" 
                                             release-strategy-expression="#{aggregateReleaseStrategyExpressionValue}"
                                             ref="ccsAggregatorBean" 
                                             method="processMessage"  
                                             auto-startup="true"/>

We occasionally notice some messages pending in INT_MESSAGE table as there are not supposed to be any pending after aggregation, could u suggest what could be the issue with it? Please note that I don't see any pending messages in either INT_MSG_GROUP or INT_GROUP_TO_MSG tables, we need to fix this issue where some messages are not aggregated.

Note:

We use group-timeout expression which expires the group based on a timestamp

1

There are 1 answers

15
Artem Bilan On

When message group is expired, the JdbcMessageStore performs these update queries:

public void removeMessageGroup(Object groupId) {
    String groupKey = getKey(groupId);

    this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGES_FROM_GROUP), groupKey, this.region, this.region);

    if (logger.isDebugEnabled()) {
        logger.debug("Removing relationships for the group with group key=" + groupKey);
    }
    this.jdbcTemplate.update(getQuery(Query.REMOVE_GROUP_TO_MESSAGE_JOIN), groupKey, this.region);

    if (logger.isDebugEnabled()) {
        logger.debug("Deleting messages with group key=" + groupKey);
    }

    this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGE_GROUP), groupKey, this.region);
}

So, the first one belongs to the mentioned INT_MESSAGE:

DELETE from INT_MESSAGE where MESSAGE_ID in 
(SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ? and REGION = ?) and REGION = ?

Then as you notices an INT_GROUP_TO_MESSAGE is cleaned from the group and so on for the INT_MESSAGE_GROUP.

Therefore it is not clear how those messages can be stale, if they belong to any group you are cleaning from the store.

On the other hand the INT_MESSAGE table is used for other scenarios where plain message manipulation is necessary (no group correlation). See more info in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/system-management.html#message-store. And a MessageStore interface. Therefore those messages could be stored from some other endpoints and they are not related to the mentioned aggregator anyway. For example Claim Check pattern uses a plain MessageStore abstraction.