Build spring integration release strategy using spring DSL

625 views Asked by At

I am new to Spring integration. I am trying to split the message from a file using file splitter and then use .aggregate() to build a single message and send to output channel. I have markers as true and hence apply-sequence is false by default now. I have set correlationId to a constant "1" using enrichHeaders. I have trouble setting the realease strategy as I do not have a hold on the sequence end. Here is how my code looks.

    IntegrationFlows
                .from(s -> s.file(new File(fileDir))
                                .filter(getFileFilter(fileName)),
                        e -> e.poller(poller))

                .split(Files.splitter(true, true)
                                .charset(StandardCharsets.US_ASCII),
                        e -> e.id(beanName)).enrichHeaders(h -> h.header("correlationId", "1"));

 IntegrationFlow integrationFlow = integrationFlowBuilder
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(FileSplitter.FileMarker.class, "markers.input")
                    .channelMapping(String.class, "lines.input"))
            .get();

@Bean
    public IntegrationFlow itemExcludes() {
        return flow -> flow.transform(new ItemExcludeRowMapper(itemExcludeRowUnmarshaller)) //This maps each line to ItemExclude object
                .aggregate(aggregator -> aggregator
                        .outputProcessor(group -> group.getMessages()
                        .stream()
                        .map(message -> ((ItemExclude) message.getPayload()).getPartNumber())
                        .collect(Collectors.joining(","))))
                .transform(Transformers.toJson())
                .channel(customSource.itemExclude());
    }

    @Bean
    public IntegrationFlow itemExcludeMarkers() {
        return flow -> flow
                .log(LoggingHandler.Level.INFO)
                .<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END))
                .<FileHandler>handle(new FileHandler(configProps))
                .channel(NULL_CHANNEL);
    }

Any help appreciated.

2

There are 2 answers

0
Gary Russell On BEST ANSWER

Use a custom release strategy that looks for the END marker in the last message and, perhaps, a custom output processor that removes the markers from the collection.

1
Artem Bilan On

I would move your header enricher for the correlationId before splitter and make it like this:

 .enrichHeaders(h -> h
        .headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, 
                   m -> m.getHeaders().getId())) 

The constant correlationId is absolutely not good in the multi-threaded environment: different threads splits different files and send different lines to the same aggregator. So, with the "1" as correlation key you'd have always one group to aggregate and release. The default sequence behavior is to populate the original message id to the correlationId. Since you are not going to rely on the applySequence from the FileSplitter I suggest that simple solution to emulate that behavior.

As Gary pointed in his answer you need to think about custom ReleaseStrategy and send FileSplitter.FileMarker to the aggregator as well. The FileSplitter.FileMarker.END has lineCount property which can be compared with the MessageGroup.size to decide that we are good to release the group. The MessageGroupProcessor indeed has to filter FileSplitter.FileMarker messages during building the result for output.