How to download a large file with hyper and resume on error?

4.6k views Asked by At

I want to download large files (500mb) with hyper, and be able to resume if the download fails.

Is there any way with hyper to run some function for each chunk of data received? The send() method returns a Result<Response>, but I can't find any methods on Response that return an iterator over chunks. Ideally I'd be able to do something like:

client.get(&url.to_string())
    .send()
    .map(|mut res| {
        let mut chunk = String::new();
        // write this chunk to disk
    });

Is this possible, or will map only be called once hyper has downloaded the entire file?

1

There are 1 answers

0
ArtemGr On BEST ANSWER

Is there any way with hyper to run some function for each chunk of data received?

Hyper's Response implements Read. It means that Response is a stream and you can read arbitrary chunks of data from it as you would usually do with a stream.

For what it's worth, here's a piece of code I use to download large files from ICECat. I'm using the Read interface in order to display the download progress in the terminal.

The variable response here is an instance of Hyper's Response.

{
    let mut file = try_s!(fs::File::create(&tmp_path));
    let mut deflate = try_s!(GzDecoder::new(response));

    let mut buf = [0; 128 * 1024];
    let mut written = 0;
    loop {
        status_line! ("icecat_fetch] " (url) ": " (written / 1024 / 1024) " MiB.");
        let len = match deflate.read(&mut buf) {
            Ok(0) => break,  // EOF.
            Ok(len) => len,
            Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
            Err(err) => return ERR!("{}: Download failed: {}", url, err),
        };
        try_s!(file.write_all(&buf[..len]));
        written += len;
    }
}

try_s!(fs::rename(tmp_path, target_path));
status_line_clear();

I want to download large files (500mb) with hyper, and be able to resume if the download fails.

This is usually implemented with the HTTP "Range" header (cf. RFC 7233).

Not every server out there supports the "Range" header. I've seen a lot of servers with a custom HTTP stack and without the proper "Range" support, or with the "Range" header disabled for some reason. So skipping the Hyper's Response chunks might be a necessary fallback.

But if you want to speed things up and save traffic then the primary means of resuming a stopped download should be by using the "Range" header.


P.S. With Hyper 0.12 the response body returned by the Hyper is a Stream and to run some function for each chunk of data received we can use the for_each stream combinator:

extern crate futures;
extern crate futures_cpupool;
extern crate hyper; // 0.12
extern crate hyper_rustls;

use futures::Future;
use futures_cpupool::CpuPool;
use hyper::rt::Stream;
use hyper::{Body, Client, Request};
use hyper_rustls::HttpsConnector;
use std::thread;
use std::time::Duration;

fn main() {
    let url = "https://steemitimages.com/DQmYWcEumaw1ajSge5PcGpgPpXydTkTcqe1daF4Ro3sRLDi/IMG_20130103_103123.jpg";

    // In real life we'd want an asynchronous reactor, such as the tokio_core, but for a short example the `CpuPool` should do.
    let pool = CpuPool::new(1);
    let https = HttpsConnector::new(1);
    let client = Client::builder().executor(pool.clone()).build(https);

    // `unwrap` is used because there are different ways (and/or libraries) to handle the errors and you should pick one yourself.
    // Also to keep this example simple.
    let req = Request::builder().uri(url).body(Body::empty()).unwrap();
    let fut = client.request(req);

    // Rebinding (shadowing) the `fut` variable allows us (in smart IDEs) to more easily examine the gradual weaving of the types.
    let fut = fut.then(move |res| {
        let res = res.unwrap();
        println!("Status: {:?}.", res.status());
        let body = res.into_body();
        // `for_each` returns a `Future` that we must embed into our chain of futures in order to execute it.
        body.for_each(move |chunk| {println!("Got a chunk of {} bytes.", chunk.len()); Ok(())})
    });

    // Handle the errors: we need error-free futures for `spawn`.
    let fut = fut.then(move |r| -> Result<(), ()> {r.unwrap(); Ok(())});

    // Spawning the future onto a runtime starts executing it in background.
    // If not spawned onto a runtime the future will be executed in `wait`.
    // 
    // Note that we should keep the future around.
    // To save resources most implementations would *cancel* the dropped futures.
    let _fut = pool.spawn(fut);

    thread::sleep (Duration::from_secs (1));  // or `_fut.wait()`.
}