How to skip the header row(first line) of the file in springcloudstream using DSL?

562 views Asked by At

I am using Spring cloud stream to read a file and split using file splitter and emit each line as a message using DSL style, the file am reading has a header row, just wondering if there is an easy way to skip the header row before/after reading.

Any help is appreciated.

here is how my splitter and integrationFlow looks like:

enter code here
  return IntegrationFlows
            .from("....")
            .split(Files.splitter(true, true)/
            .charset(StandardCharsets.UTF_8)
            .applySequence(true), //emmit sequenceNumber to header
             e -> e.id("fileSplitter")
            );


enter code here
    IntegrationFlow integrationFlow = integrationFlowBuilder
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(FileSplitter.FileMarker.class, "markers.input")
                    .channelMapping(String.class, "lines.input"))
            .get();
2

There are 2 answers

6
Vinicius Carvalho On BEST ANSWER

If I read this right you are using one of our OOB apps, the file source: https://github.com/spring-cloud-stream-app-starters/file/blob/master/spring-cloud-starter-stream-source-file/README.adoc and deploying using Spring Cloud Dataflow dsl such as stream create file ----file.consumer.mode=lines --file.directory=/tmp/ | sink correct?

If so, there's a special header called sequence_number when you are reading files in the lines mode. You can add a filter in between to drop those messages based on a header expression.

0
Pavel On

Spring Integration 5.1.5 solution:

@Bean
public MessageSource<File> sourceDirectory() {
    FileReadingMessageSource messageSource = new FileReadingMessageSource();
    messageSource.setDirectory(new File("./data/input"));
    return messageSource;
}

@Bean
public IntegrationFlow folderFlow() {
    FileSplitter fileSplitter = new FileSplitter();
    fileSplitter.setFirstLineAsHeader("columns");
    return IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(1000)))
            .split(fileSplitter)
            .handle(System.out::println)
            .get();
}