Using AWS Java S3 SDK TransferManager to resume an upload from a SFTP stream

1k views Asked by At

Currently, I am triggering an upload from an SFTP server to S3 using AWS's TransferManager in Java's S3 SDK. The way I trigger this upload is given below:

(pseudocode...)

    @Autowired
    TransferManager transferManager;

    @Autowired
    SftpStreamFactory sftpStreamFactory;

    SftpStream sftpStream = sftpStreamFactory.createStream(filePath);
    ObjectMetadata objectMetadata = new ObjectMetadata();
    objectMetadata.setContentLength(sftpStream.getSizeBytes());
    PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, sftpStream.getStream(), objectMetadata);
    putObjectRequest.setGeneralProgressListener(new UploadBeginEndNotificationListener(uploadRequest, statusNotifier));
    
    transferManager.upload(putObjectRequest);

and here is the definition for SftpStream:

@AllArgsConstructor
public class SftpStreamFactory {

@Getter
@AllArgsConstructor
public static class SftpStream {
    private final long sizeBytes;
    private final InputStream stream;
}

private final SftpRemoteFileTemplate sftpTemplate;
private final SftpProperties sftpProperties;

public SftpStream createStream(Path relativePath) {
    return sftpTemplate.<SftpStream, ChannelSftp>executeWithClient(session -> createStream(session, relativePath));
}

SftpStream createStream(ChannelSftp channelSftp, Path relativePath) {

    String path = sftpProperties.getRoot().resolve(relativePath).toString();

    try {
        SftpATTRS fileAttrs = channelSftp.lstat(path);
        long size = fileAttrs.getSize();
        return new SftpStream(size, channelSftp.get(path));
    }
    catch (SftpException e) {
        throw new UncheckedIOException(new NestedIOException("SFTP Error", e));
    }
}

}

This method of upload works fine. However, if a multipart upload is paused/cancelled/otherwise aborted in the middle, we would like to pick up where we left off instead of restarting over again. We are aware of the TransferManagers resumeUpload method that takes a PersistableUpload.

However, in the javadoc for PersistableUpload, it is expecting a file path to be passed in the constructor, and later tries to created a File object from it: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/transfer/PersistableUpload.html

What we are wondering is, is there anyway to resume an upload without having this file object, which we cannot get from our ChannelSftp? That is, can we resume an upload from a stream instead of a file? Or would we have to switch to using the low level s3 api's to perform such a resume. Any suggestions are appreciated.

Edit - Looked into a bit more and even passing an UploadId for an already existing upload, the doUpload method will throw Exception if there's no file. Any ideas?

1

There are 1 answers

0
Fahim Bagar On

The answer is, no, you can not resume upload without the file, but there's a workaround for similar case of yours:

#SUCCESS TO PAUSE BEFORE ABORTED

  1. If and only if, before any interruption occurred, try to pause the connection
boolean forceCancel = true;
PauseResult<PersistableUpload> pauseResult = myUpload.tryPause(forceCancel);
  1. Save info to resume data to file
PersistableUpload persistableUpload = pauseResult.getInfoToResume();

File f = new File("UNIQUE-ID-FOR-UPLOADED-FILE"); //blob
if (!f.exists())
    f.createNewFile();
FileOutputStream fos = new FileOutputStream(f);

// Serialize the persistable upload to the file.
persistableUpload.serialize(fos);
fos.close();
  1. Continue sometimes later
TransferManager tm = new TransferManager();
FileInputStream fis = new FileInputStream(new File("UNIQUE-ID-FOR-UPLOADED-FILE"));

// Deserialize PersistableUpload information from disk.
PersistableUpload persistableUpload = PersistableTransfer.deserializeFrom(fis);

// Call resumeUpload with PersistableUpload.
tm.resumeUpload(persistableUpload);

fis.close();

#SAVE STREAM TO MAINTAIN FILE IF SOMETHING'S HAPPENED (e.g. JVM Crash)

Use S3SyncProgressListener to TransferManager#upload to persist every change and serializes the data to disk.

transferManager.upload(putObjectRequest, new S3SyncProgressListener() {

    ExecutorService executor = Executors.newFixedThreadPool(1);

    @Override
    public void onPersistableTransfer(final PersistableTransfer persistableTransfer) {

       executor.submit(new Runnable() {
          @Override
          public void run() {
              try {
                  File f = new File("UNIQUE-ID-FOR-UPLOADED-FILE");
                  if (!f.exists()) {
                      f.createNewFile();
                  }
                  FileOutputStream fos = new FileOutputStream(f);
                  persistableTransfer.serialize(fos);
                  fos.close();
              } catch (IOException e) {
                  throw new RuntimeException("Unable to persist transfer to disk.", e);
              }
          }
       });
    }
});

Hope it helps.