SFTP inbound adapter and transaction synchronization

1.1k views Asked by At

I am using SFTP inbound adapter with custom filter. What I like to implement is, if filter accept the file then let it move through message flow where that file will be delivered to spring batch job using "batch-integration:job-launching-gateway" which is working fine. Now problem is, if filter rejects it then I want it to move to some dir like "/failed/" also if during batch processing if jdbc transaction fails then also i want it to move to "/failed/" dir so that we handle them later. How can we synchronize jdbc transaction with this scenario in spring integration.

I read this doc(http://docs.spring.io/spring-integration/reference/html/transactions.html) in spring integration but not clear about, a sample app would be perfect otherwise some hints are also ok.

Configuration

<context:component-scan base-package="com.sftp.test" />
<import resource="beans-context.xml" />

<!-- Default poller -->
<int:poller default="true" fixed-delay="5000"/>


<!-- SFTP inbound poller -->
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
    channel="fileMessageChannel" session-factory="sftpSessionFactory"
    local-directory="${test.sftp.local.dir}" remote-directory="${test.sftp.remote.dir}"
    remote-file-separator="/" auto-create-local-directory="true"
    delete-remote-files="true" filename-pattern="*" local-filter="testFileFilter"
    preserve-timestamp="true" temporary-file-suffix=".writing"
    >

    <int:poller cron="${test.file.poll.frequency}"
        max-messages-per-poll="1">
        <int:transactional transaction-manager="transactionManager"
            synchronization-factory="syncFactory" /> 
    </int:poller>
</int-sftp:inbound-channel-adapter>

<int:transaction-synchronization-factory
    id="syncFactory">
    <int:after-commit expression="payload.renameTo('/tmp/dds/test/success/' + payload.name)"
        channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo('/tmp/dds/test/failed/' + payload.name)"
        channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

<int:channel id="committedChannel">
    <int:queue />
</int:channel>

<int:logging-channel-adapter channel="committedChannel" />


<int:channel id="rolledBackChannel">
    <int:queue />
</int:channel>

<int:logging-channel-adapter channel="rolledBackChannel" />


<!-- Channel where message with file name will be dropped by inbound sftp 
    channel adapter -->
<int:channel id="fileMessageChannel">
    <int:queue />
</int:channel>

<!-- Transform spring integration message to job launch request -->
<int:transformer input-channel="fileMessageChannel"
    output-channel="jobRequestChannel" id="jobLaunchMessageTransformer">
    <bean class="com.sftp.test.test.util.FileMessageToJobRequest">
        <property name="job" ref="testJob" />
        <property name="fileParameterName" value="fileName" />
    </bean>
</int:transformer>

<!-- Job request channel -->
<int:channel id="jobRequestChannel">
    <int:queue />
</int:channel>

<!-- The JobLaunchingGateway is used to launch Batch Jobs. Internally it 
    delegates to a JobLaunchingMessageHandler. -->
<batch-integration:job-launching-gateway
    request-channel="jobRequestChannel" reply-channel="jobLaunchReplyChannel" />

<!-- job response channel -->
<int:channel id="jobLaunchReplyChannel">
    <int:queue />
</int:channel>

<!-- Logging response received from job on jobLaunchReplyChannel -->

<int:outbound-channel-adapter channel="jobLaunchReplyChannel"   ref="fileProcessAdapter"    method="moveFile"/>

1

There are 1 answers

0
Gary Russell On

The problem is that you are using QueueChannels. As soon as you hand off the request to another thread, the poller transaction will "commit".

You need to run the job on the poller thread (remove the <queue/> elements from the channels). Add a task executor to the poller if you want to run multiple jobs concurrently.

Also, in order to reject and rename a file, you can't use an internal filter because filtered files don't produce a message. You could either do the rename within your filter, or use an AcceptAllFileListFilter in the adapter and use a <filter/> in the flow (configured to throw an exception on rejection).