Recursively read files with Spring Integration SFTP DSL

3.3k views Asked by At

I am trying to make Spring Integration SFTP read files (.txt) from a remote server recursively from all subfolders. The remote folder is something like "/tmp/remoteFolder" and all subfolders are date folders like "/tmp/remoteFolder/20180830", "/tmp/remoteFolder/20170902".

This is the code that I have until now

@Bean
@InboundChannelAdapter(value = "sftpMgetInputChannel",
    poller = @Poller(fixedDelay = "5000"))
public IntegrationFlow sftpMGetFlow() {
    return IntegrationFlows.from("sftpMgetInputChannel")
            .handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory,
             Command.MGET, "'/tmp/remoteDirectory/*'")
            .options(Option.RECURSIVE)
            .regexFileNameFilter("((\\d{8})|*\\.txt)")
            .localDirectoryExpression("sftp-inbound" + "/" + "#remoteDirectory"))
            .channel(remoteFileOutputChannel())
            .get();
}

@Bean
public MessageChannel sftpMgetInboundChannel(){
   return new DirectChannel();
}

@Bean
public PollableChannel remoteFileOutputChannel() {
    return new QueueChannel();
}

How do I specify the root remote directory for sftp mget to be /tmp/remoteFolder? Why isn't this working? Why do I need to specifiy the output channel?

Update: Instead of calling channel(remoteFileOutputChannel()) I call a handler like this

@Bean
public MessageHandler messageHandler(){
 return new MessageHandler() { ... }
}

Code updated:


    @InboundChannelAdapter(value = "sftpMgetInputChannel",
        poller = @Poller(fixedDelay = "5000"))
    public String filesForMGET(){
      return "'/tmp/input/remoteDirectory/*'";
    }

    @Bean
    public IntegrationFlow sftpMGetFlow() {
        return IntegrationFlows.from("sftpMgetInputChannel")
                .handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory,
                 Command.MGET, "payload")
                .options(Option.RECURSIVE)
                .regexFileNameFilter("((\\d{8})|*\\.txt)")
                .localDirectoryExpression("'sftp-inbound/'" + "#remoteDirectory"))
                .handler(messageHandler())
                .get();
    }

    @Bean
    public MessageChannel sftpMgetInboundChannel(){
       return new DirectChannel();
    }

    @Bean
    public MessageHandler messageHandler(){
     return new MessageHandler() { ... }
    }

With this updated code, I get the following error:


    rg.springframework.core.NestedIOException: failed to read file; nested exception is 2: No such file
        at org.springframework.integration.sftp.session.SftpSession.read(SftpSession.java:100)
        at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.read(CachingSessionFactory.java:137)
        at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.copyFileToLocalDirectory(AbstractInboundFileSynchronizer.java:176)
        at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:138)
        at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:144)
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.doPoll(SourcePollingChannelAdapter.java:89)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:207)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:202)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:51)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:680)
    Caused by: 2: No such file
        at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2289)
        at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:1741)
        at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1011)
        at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:986)
        at org.springframework.integration.sftp.session.SftpSession.read(SftpSession.java:96)
        ... 22 more

1

There are 1 answers

14
Gary Russell On BEST ANSWER

With the expression set to payload (as was the case in your question before the edit), the message payload sent to the gateway should be /tmp/remoteFolder/* which internally is split into remote directory and remote filename (*).

Why do I need to specifiy the output channel?

The result of the MGET (list of retrieved files) needs to go somewhere.

EDIT

You misunderstond; you can't add the @InboundChannelAdapter annotation to the flow; you need something like this...

@InboundChannelAdapter(value = "sftpMgetInputChannel",
    poller = @Poller(fixedDelay = "5000"))
public String filesForMGET() {
    return "/tmp/remoteDirectory/";
}

@Bean
public IntegrationFlow sftpMGetFlow() {
    return IntegrationFlows.from("sftpMgetInputChannel")
            .handleWithAdapter(h -> h.sftpGateway(this.sftpSessionFactory,
             Command.MGET, "payload")
            .options(Option.RECURSIVE)
            .regexFileNameFilter("((\\d{8})|*\\.txt)")
            .localDirectoryExpression("sftp-inbound" + "/" + "#remoteDirectory"))
            .channel(remoteFileOutputChannel())
            .get();
}