transactionSynchronizationFactory in combination with database transaction not working

54 views Asked by At

I am expecting my code to receive the file based on poller once it is created in remote directory then do a database insert and then move the file to another remote directory.

So far file is polled and database insert succeeds but remote file gets deleted from the source directory and is NOT moved to remote directory. I am not able to figure out what is wrong here.

Spring boot version: 3.2.1 database: Postgres

My Java Configuration:

  @Bean
  public DefaultFtpsSessionFactory ftpsSessionFactory() {
    DefaultFtpsSessionFactory sessionFactory = new DefaultFtpsSessionFactory();
    sessionFactory.setHost(ftpHost);
    sessionFactory.setUsername(ftpUsername);
    sessionFactory.setPassword(ftpPassword);
    sessionFactory.setPort(ftpPort);
    sessionFactory.setControlEncoding(StandardCharsets.UTF_8.name());
    sessionFactory.setImplicit(true);
    sessionFactory.setProtocol("TLS");
    return sessionFactory;
  }

  @Bean
  public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
    FtpInboundFileSynchronizer fileSynchronizer =
        new FtpInboundFileSynchronizer(ftpsSessionFactory());

    fileSynchronizer.setDeleteRemoteFiles(true);
    fileSynchronizer.setRemoteDirectory("/remote-in");
    fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.txt"));
    return fileSynchronizer;
  }

  @Bean
  @InboundChannelAdapter(channel = "ftpChannel",
      poller = @Poller(value = "pollerMetadata"))
  public MessageSource<File> ftpMessageSource() {
    FtpInboundFileSynchronizingMessageSource source =
        new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
    source.setLocalDirectory(new File("/tmp"));
    source.setAutoCreateLocalDirectory(true);
    source.setLocalFilter(new AcceptOnceFileListFilter<>());
    source.setMaxFetchSize(Integer.MIN_VALUE);
    return source;
  }

  @Bean
  public PollerMetadata pollerMetadata() {
    return Pollers.fixedDelay(ftpPollerDelay)
        .advice(transactionInterceptor())
        .transactionSynchronizationFactory(transactionSynchronizationFactory())
        .getObject();
  }

  @Bean
  public JpaTransactionManager transactionManager() {
    return new JpaTransactionManager();
  }

  @Bean
  public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder()
        .transactionManager(transactionManager())
        .build();
  }

  public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionEvaluatingTransactionSynchronizationProcessor processor =
        new ExpressionEvaluatingTransactionSynchronizationProcessor();
    SpelExpressionParser spelParser = new SpelExpressionParser();
    processor.setAfterCommitExpression(
        spelParser.parseExpression("payload.renameTo(new java.io.File"
            + "(#systemProperties.get('ftp.directory.archive' )  + "
            + "T(java.io.File).separator + payload.name))"));
    return new DefaultTransactionSynchronizationFactory(processor);
  }

  @Bean
  @ServiceActivator(inputChannel = "ftpChannel")
  public MessageHandler ftpInboundMessageHandler() {
    return message -> {
      Object payload = message.getPayload();
      if (payload instanceof File file) {
        // save record in db
      } else {
        log.error("Received invalid message payload {}", message.getPayload());
      }
    };
  }

1

There are 1 answers

0
riteshmaurya On

I resolved this by changing interceptor bean name

  @Bean
  public TransactionInterceptor jpaTransactionInterceptor() {
    return new TransactionInterceptorBuilder()
        .transactionManager(transactionManager)
        .build();
  }

 @Bean
  public PollerMetadata pollerMetadata() {

    return Pollers.fixedDelay(ftpPollerDelay)
        .advice(jpaTransactionInterceptor())
        .transactionSynchronizationFactory(transactionSynchronizationFactory())
        .getObject();
  }

and provide beanFactory to ExpressionEvaluatingTransactionSynchronizationProcessor to avoid warning message on no bean factory during TransactionSynchronizationFactory bean creation.

  @Bean
  public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionEvaluatingTransactionSynchronizationProcessor processor =
        new ExpressionEvaluatingTransactionSynchronizationProcessor();
    processor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
    
    return new DefaultTransactionSynchronizationFactory(processor);
  }