How do I append futures to a BufferUnordered stream?

746 views Asked by At

I'm trying to append futures to the underlying stream of BufferUnordered. At the moment I'm pushing them directly into the underlying stream, the Fuse stream of BufferUnordered is empty, so pushing to it has no effect and the loop below does not receive the 3rd response. Changing the definition of next (1) to stream.buffer_unordered(1) seems to make it work because the underlying stream is not empty/finished.

extern crate url;
extern crate futures;
extern crate tokio_core;
extern crate reqwest;

use url::Url;
use futures::*;
use tokio_core::reactor::Core;
use reqwest::unstable::async::{Client, Response, Decoder};

fn main() {
    let mut core = Core::new().unwrap();
    let client = Client::new(&core.handle()).unwrap();
    let hyper = client.get("https://hyper.rs").unwrap().send();
    let google = client.get("https://google.com").unwrap().send();
    let stream = stream::futures_unordered(vec![future::ok(hyper), future::ok(google)]);

    let mut next = stream.buffer_unordered(5).into_future(); // (1)
    loop {
        match core.run(next) {

            Ok((None, _something)) => {
                println!("finished");
                break;
            },
            Ok((Some(response), mut next_requests)) => {
                {
                    let inner = next_requests.get_mut();
                    println!("{}", inner.is_empty());
                    println!("{}", response.status());
                    let yahoo = client.get("https://yahoo.com").unwrap().send();
                    inner.push(future::ok(yahoo)); // no effect here
                }
                next = next_requests.into_future();
            }
            Err((error, next_requests)) => {
                next = next_requests.into_future();
            }
        }
    }
}

How do I add more futures to BufferUnordered? Do I actually have to chain it or do something along these lines?

0

There are 0 answers