I download a tar.gz file with libcurl, and need to decompress the file while downloading, that is, when chunk of the file is downloaded, decompress chunk of the file imediately, instead of decompressing the whole file when the whole file is downloaded. Is there any C/C++ libraries that meet my requirements
I tried to use libarchive to extract the file, but it returned truncated gzip input when extracting the first chunk of the file. It seems that libarchive need the whole file to extract it. Here is my code. I am not sure if i used libarchive correctly as I am new to it.
#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <atomic>
#include <thread>
// libarchive
#include <archive.h>
#include <archive_entry.h>
#include <curl/curl.h>
struct mydata {
void *buffer;
ssize_t *size;
};
struct curldata {
void *buffer;
ssize_t *size;
CURL *curl;
};
std::atomic<bool> rd(true);
struct archive *archive, *archivefd;
std::atomic<bool> start_read(false);
la_ssize_t libarchiveRead(struct archive* a, void* client_data, const void** block)
{
if(!rd) {
mydata *my_data = (mydata*)client_data;
std::cout << "calling custom read(), size " << *(my_data->size) << std::endl;
*block = my_data->buffer;
rd=true;
return *(my_data->size);
}
return 0;
}
int libarchiveClose(struct archive* a, void* client_data)
{
std::cout << "calling custom close() for archive" << std::endl;
mydata *my_data = (mydata*)client_data;
delete my_data;
return (ARCHIVE_OK);
}
int libarchiveClosefd(struct archive* a, void* client_data)
{
std::cout << "calling custom close() for archivefd" << std::endl;
mydata *my_data = (mydata*)client_data;
delete my_data;
return (ARCHIVE_OK);
}
static size_t curlWriteFunction(void *ptr, size_t size, size_t nmemb, void *write_data) {
//size is always 1
curldata *my_data = (curldata*)(write_data);
*(my_data->size) = nmemb * size;
std::cout << "calling curlWriteFunction(), size: " << size << " , nmemb: " << nmemb
<< " , my_data->size: " << *(my_data->size) << std::endl;
memcpy(my_data->buffer, ptr, *(my_data->size));
curl_easy_pause(my_data->curl, CURL_WRITEFUNC_PAUSE);
rd=false;
return (*(my_data->size));
}
static size_t progress(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow) {
CURL *curl = (CURL *)clientp;
(void)ultotal;
(void)ulnow;
if(dltotal == 0) {
return 0;
}
if(rd) {
curl_easy_pause(curl, CURLPAUSE_CONT);
std::cout << "progress: " << dlnow/dltotal * 100 << "%" << std::endl;
}
return 0;
}
void readarchive(void *client_data) {
struct archive_entry *entry;
int flags = ARCHIVE_EXTRACT_TIME;
flags |= ARCHIVE_EXTRACT_PERM;
flags |= ARCHIVE_EXTRACT_ACL;
flags |= ARCHIVE_EXTRACT_FFLAGS;
while(rd);
std::cout << "calling archive_read_open for archive.." << std::endl;
int res = archive_read_open(archive,
client_data,
nullptr,
(archive_read_callback*)libarchiveRead,
(archive_close_callback*)libarchiveClose);
std::cout << "called archive_read_open for archive.." << std::endl;
res = archive_read_next_header(archive, &(entry));
while(res == ARCHIVE_OK ) {
std::cout << "Extracting for archive " << archive_entry_pathname(entry) << "..." << std::endl;
// extract current entry
archive_read_extract(archive, entry, flags);
// read next if available
res = archive_read_next_header(archive, &(entry));
}
std::cout << "archive_read_next_header for archive failed, errcode: " << res << " error: " << archive_error_string(archive) << std::endl;
}
//size_t curlWriteFunction(void *ptr, size_t size, size_t nmemb,FILE* fptr) {
// //size is always 1
// std::cout << "calling curlWriteFunction().." << std::endl;
// return fwrite(ptr, size, nmemb, fptr);
//}
int main(int argc, char** argv) {
if(argc < 3)
{
std::cout << argv[0] << "{-r | -w} file[s]" << std::endl;
return 1;
}
std::vector<std::string> filenames;
filenames.reserve(argc);
while (*++argv != nullptr)
{
filenames.emplace_back(*argv);
}
bool modeRead = (filenames[0] == "-r");
std::cout << filenames[0] << " " << filenames[1] << std::endl;
// archive related variables
char buff_archive[16 * 1024], buff_archivefd[16 * 1024];
if(modeRead)
{
archive = archive_read_new();
archive_read_support_filter_gzip(archive);
archive_read_support_format_tar(archive);
mydata *client_data = new mydata();
int res;
char *buff1 = new char[16 * 1024];
client_data->size = new ssize_t;
*(client_data->size) = 0;
client_data->buffer = buff1;
curldata *curl_data = new curldata();
curl_data->size=client_data->size;
curl_data->buffer=buff1;
CURL *curl = curl_easy_init();
curl_data->curl = curl;
curl_easy_setopt(curl, CURLOPT_URL, filenames[1].c_str());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, curl_data);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriteFunction);
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L);
curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, curl);
curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION,progress);
std::thread t(readarchive, client_data);
CURLcode result = curl_easy_perform(curl);
if(result != CURLE_OK) {
std::cout << "curl perform failed, errcode; " << result << " err: " << curl_easy_strerror(result) << std::endl;
}
//std::cout << "calling archive_read_open for archivefd.." << std::endl;
//res = archive_read_open(archivefd,
// client_datafd,
// nullptr,
// (archive_read_callback*)libarchiveReadfd,
// (archive_close_callback*)libarchiveClosefd);
//std::cout << "called archive_read_open for archivefd.." << std::endl;
//res = archive_read_next_header(archivefd, &(entry));
//if (res != ARCHIVE_OK) {
// std::cout << "archive_read_next_header for archivefd failed, errcode: " << res << " error: " << archive_error_string(archivefd) << std::endl;
//}
//while(res == ARCHIVE_OK) {
// std::cout << "Extracting for archivefd " << archive_entry_pathname(entry) << "..." << std::endl;
// // extract current entry
// archive_read_extract(archivefd, entry, flags);
// // read next if available
// res = archive_read_next_header(archivefd, &(entry));
//}
t.join();
delete client_data->size;
delete []buff1;
archive_read_close(archive);
archive_read_free(archive);
archive_read_free(archive);
curl_easy_cleanup(curl);
}
return 0;
}
libarchive hooks into external programs for some compression algorithms, including gunzip. You can't just pass it a chunk of a file and get it to decompress it, but you can pass it a file descriptor so it pipes data into gunzip then back through libarchive to decompress and untar an archive on the fly. Here is a quick example program without any error handling that takes a local compressed tar (documentation says this could also be a socket), assigns it a file descriptor and extracts it straight to disk. I've passed gigabyte sized files through it and it only ever uses a few KB of memory so it seems to have the functionality you want. I used a buffer size of 512 as archive files are made up of 512 byte blocks but this could be larger:
EDIT
After testing this with a socket, turns out it isn't really possible using libarchive despite its (very limited) documentation seeming to claim it is. It works fine with uncompressed tars, but with compressed tars, libarchive uses lookahead code to determine the compression format that requires the entire file on the call to archive_read_open_fd. Even disabling the lookahead code and manually specifying the compression program with archive_read_support_filter_program didn't work for me. You could do archive_read_open on the client, then upload the input_file archive struct to the server ahead of the data, but it seems you're only handling one side of the download so that wouldn't work.
I ended up not using libarchive at all to achieve this. Since I'm developing on Linux, I fork to tar and pipe data read from the socket into it. Here's a highly simplified example with no error handling or the required socket code: