C/C++ library to decompress tar.gz file while downloading

388 views Asked by At

I download a tar.gz file with libcurl, and need to decompress the file while downloading, that is, when chunk of the file is downloaded, decompress chunk of the file imediately, instead of decompressing the whole file when the whole file is downloaded. Is there any C/C++ libraries that meet my requirements

I tried to use libarchive to extract the file, but it returned truncated gzip input when extracting the first chunk of the file. It seems that libarchive need the whole file to extract it. Here is my code. I am not sure if i used libarchive correctly as I am new to it.

#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <atomic>
#include <thread>

// libarchive
#include <archive.h>
#include <archive_entry.h>
#include <curl/curl.h>

struct mydata {
  void *buffer;
  ssize_t *size;
};
struct curldata {
    void *buffer;
    ssize_t *size;
    CURL *curl;
};

std::atomic<bool> rd(true);
struct archive *archive, *archivefd;
std::atomic<bool> start_read(false);
la_ssize_t libarchiveRead(struct archive* a, void* client_data, const void** block)
{
    if(!rd) {
    mydata *my_data = (mydata*)client_data;
    std::cout << "calling custom read(), size " << *(my_data->size) << std::endl;
    *block = my_data->buffer;
    rd=true;
    return *(my_data->size);
    }
    return 0;
}

int libarchiveClose(struct archive* a, void* client_data)
{
    std::cout << "calling custom close() for archive" << std::endl;
    mydata *my_data = (mydata*)client_data;
    delete my_data;
    return (ARCHIVE_OK);
}

int libarchiveClosefd(struct archive* a, void* client_data)
{
    std::cout << "calling custom close() for archivefd" << std::endl;
    mydata *my_data = (mydata*)client_data;
    delete my_data;
    return (ARCHIVE_OK);
}
static size_t curlWriteFunction(void *ptr, size_t size, size_t nmemb, void *write_data) {
    //size is always 1
    curldata *my_data = (curldata*)(write_data);
    *(my_data->size) = nmemb * size;
    std::cout << "calling curlWriteFunction(), size: " << size << " , nmemb: " << nmemb
    << " , my_data->size: " << *(my_data->size) << std::endl;
    memcpy(my_data->buffer, ptr, *(my_data->size));
    curl_easy_pause(my_data->curl, CURL_WRITEFUNC_PAUSE);
    rd=false;
    return (*(my_data->size));
}


 static size_t progress(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow) {
    CURL *curl = (CURL *)clientp;
    (void)ultotal;
    (void)ulnow;
    if(dltotal == 0) {
        return 0;
    }
    if(rd) {

        curl_easy_pause(curl, CURLPAUSE_CONT);
        std::cout << "progress: " << dlnow/dltotal * 100 << "%" << std::endl;
    }
    return 0;
 }

void readarchive(void *client_data) {
    struct archive_entry    *entry;
    int flags = ARCHIVE_EXTRACT_TIME;
    flags |= ARCHIVE_EXTRACT_PERM;
    flags |= ARCHIVE_EXTRACT_ACL;
    flags |= ARCHIVE_EXTRACT_FFLAGS;
    while(rd);
    std::cout << "calling archive_read_open for archive.." << std::endl;
    int res = archive_read_open(archive,
                                client_data,
                                nullptr,
                                (archive_read_callback*)libarchiveRead,
                                (archive_close_callback*)libarchiveClose);
    std::cout << "called archive_read_open for archive.." << std::endl;
    res = archive_read_next_header(archive, &(entry));
    while(res == ARCHIVE_OK ) {
        std::cout << "Extracting for archive " << archive_entry_pathname(entry) << "..." << std::endl;
        // extract current entry
        archive_read_extract(archive, entry, flags);
        // read next if available
        res = archive_read_next_header(archive, &(entry));
    }
    std::cout << "archive_read_next_header for archive failed, errcode: " << res << " error: " << archive_error_string(archive) << std::endl;
}

//size_t curlWriteFunction(void *ptr, size_t size, size_t nmemb,FILE* fptr) {
//    //size is always 1
//    std::cout << "calling curlWriteFunction().." << std::endl;
//    return fwrite(ptr, size, nmemb, fptr);
//}
int main(int argc, char** argv) {

    if(argc < 3)
    {
        std::cout << argv[0] << "{-r | -w} file[s]" << std::endl;
        return 1;
    }

    std::vector<std::string> filenames;
    filenames.reserve(argc);

    while (*++argv != nullptr)
    {
        filenames.emplace_back(*argv);
    }

    bool modeRead = (filenames[0] == "-r");
    std::cout << filenames[0] << " " << filenames[1] << std::endl;

    // archive related variables

    char buff_archive[16 * 1024], buff_archivefd[16 * 1024];

    if(modeRead)
    {

        archive = archive_read_new();
        archive_read_support_filter_gzip(archive);
        archive_read_support_format_tar(archive);

        mydata *client_data = new mydata();
        int res;
        char *buff1 = new char[16 * 1024];

        client_data->size = new ssize_t;
        *(client_data->size) = 0;
        client_data->buffer = buff1;

        curldata *curl_data = new curldata();
        curl_data->size=client_data->size;
        curl_data->buffer=buff1;

        CURL *curl = curl_easy_init();
        curl_data->curl = curl;
        curl_easy_setopt(curl, CURLOPT_URL, filenames[1].c_str());
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, curl_data);
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriteFunction);
        curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L);
        curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, curl);
        curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION,progress);
        std::thread t(readarchive, client_data);
        CURLcode result = curl_easy_perform(curl);
        if(result != CURLE_OK) {
            std::cout << "curl perform failed, errcode; " << result << " err: " << curl_easy_strerror(result) << std::endl;
        }
        //std::cout << "calling archive_read_open for archivefd.." << std::endl;
        //res = archive_read_open(archivefd,
        //                            client_datafd,
        //                            nullptr,
        //                            (archive_read_callback*)libarchiveReadfd,
        //                            (archive_close_callback*)libarchiveClosefd);
        //std::cout << "called archive_read_open for archivefd.." << std::endl;

        //res = archive_read_next_header(archivefd, &(entry));
        //if (res != ARCHIVE_OK) {
        //    std::cout << "archive_read_next_header for archivefd failed, errcode: " << res << " error: " << archive_error_string(archivefd) << std::endl;
        //}
        //while(res == ARCHIVE_OK) {
        //    std::cout << "Extracting for archivefd " << archive_entry_pathname(entry) << "..." << std::endl;
        //    // extract current entry
        //    archive_read_extract(archivefd, entry, flags);
        //    // read next if available
        //    res = archive_read_next_header(archivefd, &(entry));
        //}
        t.join();
        delete client_data->size;
        delete []buff1;
        archive_read_close(archive);
        archive_read_free(archive);
        archive_read_free(archive);
        curl_easy_cleanup(curl);
    }


    return 0;

}
1

There are 1 answers

0
Bearded Penguin On

libarchive hooks into external programs for some compression algorithms, including gunzip. You can't just pass it a chunk of a file and get it to decompress it, but you can pass it a file descriptor so it pipes data into gunzip then back through libarchive to decompress and untar an archive on the fly. Here is a quick example program without any error handling that takes a local compressed tar (documentation says this could also be a socket), assigns it a file descriptor and extracts it straight to disk. I've passed gigabyte sized files through it and it only ever uses a few KB of memory so it seems to have the functionality you want. I used a buffer size of 512 as archive files are made up of 512 byte blocks but this could be larger:

#include <archive.h>
#include <archive_entry.h>
#include <fcntl.h>
#include <unistd.h>

int main() {
  const char *input_name = "compressed.tgz";
  struct archive *input_file = archive_read_new();
  struct archive_entry *entry;
  struct archive *output_file = archive_write_disk_new();

  int flags = ARCHIVE_EXTRACT_ACL;  // Attempt to restore Access Control lists.
  flags |= ARCHIVE_EXTRACT_FFLAGS;  // Attempt to restore file attributes.
  flags |= ARCHIVE_EXTRACT_OWNER;   // User and group IDs set on the file.
  flags |= ARCHIVE_EXTRACT_PERM;    //  Full permissions (including SGID, SUID, and sticky bits) are restored.
  flags |= ARCHIVE_EXTRACT_TIME;    // Timestamps (mtime, ctime, and atime) are restored.
  flags |= ARCHIVE_EXTRACT_UNLINK;  // Existing files on disk will be unlinked before creating them.
  flags |= ARCHIVE_EXTRACT_XATTR;   // Attempt to restore extended file attributes.

  archive_write_disk_set_options(output_file, flags);
  archive_write_disk_set_standard_lookup(output_file);
  archive_read_support_filter_all(input_file);
  archive_read_support_format_all(input_file);
  int fd = open(input_name, O_RDONLY);
  archive_read_open_fd(input_file, fd, 512);

  char buffer[512];
  size_t buffer_size = 512;
  ssize_t remaining;
  while (archive_read_next_header(input_file, &entry) != ARCHIVE_EOF) {
    archive_write_header(output_file, entry);
    remaining = archive_read_data(input_file, buffer, buffer_size);
    while (remaining > 0) {
      archive_write_data(output_file, buffer, buffer_size);
      remaining = archive_read_data(input_file, buffer, buffer_size);
    }
  }

  archive_read_free(input_file);
  archive_write_free(output_file);
}

EDIT

After testing this with a socket, turns out it isn't really possible using libarchive despite its (very limited) documentation seeming to claim it is. It works fine with uncompressed tars, but with compressed tars, libarchive uses lookahead code to determine the compression format that requires the entire file on the call to archive_read_open_fd. Even disabling the lookahead code and manually specifying the compression program with archive_read_support_filter_program didn't work for me. You could do archive_read_open on the client, then upload the input_file archive struct to the server ahead of the data, but it seems you're only handling one side of the download so that wouldn't work.

I ended up not using libarchive at all to achieve this. Since I'm developing on Linux, I fork to tar and pipe data read from the socket into it. Here's a highly simplified example with no error handling or the required socket code:

// Need to include fcntl.h, unistd.h and sys/wait.h

int extract_pipe[2]{-1, -1};
pipe(extract_pipe);
pid_t pid = fork();

if (pid == 0) {
  // Child process.

  dup2(extract_pipe[0], STDIN_FILENO);  // Duplicate read end of pipe to stdin.
  close(extract_pipe[0]);
  close(extract_pipe[1]);

  // Pipe data into tar through stdin.
  // If the compression program is not installed, execlp just fails.
  execlp("tar", "tar", "--gzip", "-xC", "/output/path", static_cast<char *>(0));
  // Shouldn't get here.
  exit(EXIT_FAILURE);

} else {
  // Parent process.

  // Has been redirected to child process stdin, so fd can now be closed.
  close(extract_pipe[0]);
  int socket_fd;  // Assuming this is set to a socket fd with accept().
  size_t download_size;  // Assuming this is set by header information.
  const size_t kReadSize = 1024;  // Size of chunks your file is downloaded in.
  int bytes_uploaded = 0;
  while (bytes_uploaded < download_size) {
    bytes_uploaded += splice(socket_fd, NULL, extract_pipe[1], NULL, kReadSize, 0);
  }
  close(extract_pipe[1]);  // Close pipe to send EOF flag to tar.
  int process_status;
  // Hang until process finished, use WNOHANG flag if you don't want to do this.
  (void)waitpid(pid, &process_status, 0);  
  if (WIFEXITED(process_status) && (WEXITSTATUS(process_status) == 0)) 
  {  // Tar succeeded.
  } else {
    // Tar failed.
  }
}