I have this code:
extern crate actix;
extern crate actix_web;
extern crate env_logger;
extern crate futures; // 0.1.25
extern crate tokio; // 0.1.17
use futures::future::ok as fut_ok;
use futures::Future;
use tokio::runtime::Builder;
use std::sync::{Arc, Mutex};
extern crate serde_json;
type Error = ();
fn questions_data(
val: i32,
) -> Box<Future<Item = serde_json::Value, Error = actix_web::error::Error>> {
let f = std::fs::read_to_string("auth_token").unwrap();
let token = f.trim();
use actix_web::{client, HttpMessage};
use std::time::Duration;
Box::new(
client::ClientRequest::get(
"https://jsonplaceholder.typicode.com/todos/".to_owned() + &val.to_string(),
)
.header(
actix_web::http::header::AUTHORIZATION,
"Bearer ".to_owned() + token,
)
.finish()
.unwrap()
.send()
.timeout(Duration::from_secs(30))
.map_err(actix_web::error::Error::from) // <- convert SendRequestError to an Error
.and_then(|resp| {
resp.body().limit(67_108_864).from_err().and_then(|body| {
let resp: serde_json::Value = serde_json::from_slice(&body).unwrap();
fut_ok(resp)
})
}),
)
}
fn main() {
let num_workers = 8;
let mut core = Builder::new().core_threads(num_workers).build().unwrap();
let results = Arc::new(Mutex::new(Vec::new()));
for n in 1..100 {
let res = results.clone();
core.spawn(questions_data(n).map(move |n| {
res.lock().unwrap().push(n);
}));
}
core.shutdown_on_idle().wait().unwrap();
let data = results.lock().unwrap();
println!("{:?}", *data);
}
[dependencies]
futures = "0.1.25"
tokio-core = "0.1.17"
futures-util = "0.2.1"
tokio = "0.1.11"
rand = "0.6.0"
actix-web = "0.7.14"
actix = "0.7.6"
env_logger = "0.6.0"
serde_json = "1.0.33"
I get error when I cargo run
it:
error[E0271]: type mismatch resolving `<std::boxed::Box<futures::Map<std::boxed::Box<dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>>, [closure@src/main.rs:52:51: 54:10 res:_]>> as futures::Future>::Error == ()`
--> src/main.rs:52:14
|
52 | core.spawn(Box::new(questions_data(n).map(move |n| {
| ^^^^^ expected struct `actix_web::Error`, found ()
|
= note: expected type `actix_web::Error`
found type `()`
error[E0277]: `dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>` cannot be sent between threads safely
--> src/main.rs:52:14
|
52 | core.spawn(Box::new(questions_data(n).map(move |n| {
| ^^^^^ `dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>` cannot be sent between threads safely
|
= help: the trait `std::marker::Send` is not implemented for `dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>`
= note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>>`
= note: required because it appears within the type `std::boxed::Box<dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>>`
= note: required because it appears within the type `futures::Map<std::boxed::Box<dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>>, [closure@src/main.rs:52:51: 54:10 res:_]>`
= note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<futures::Map<std::boxed::Box<dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>>, [closure@src/main.rs:52:51: 54:10 res:_]>>`
= note: required because it appears within the type `std::boxed::Box<futures::Map<std::boxed::Box<dyn futures::Future<Item=serde_json::Value, Error=actix_web::Error>>, [closure@src/main.rs:52:51: 54:10 res:_]>>`
Similar code, without running actix-web client, works https://play.rust-lang.org/?version=stable&mode=debug&edition=2015&gist=e81185a73fcb40a3426875573c78accd
EDIT:
Maybe something like map_err(|_| ())
but don’t know how to apply that:
As
rustc
says there are two errors here:First is about error types mismatch. It could be fixed with
map_err
as you already noticed:Second (about
Send
marker) is somewhat more confusing. Basically it means thattokio::Runtime::spawn
want its argument future to be alsoSend + 'static
because tokio could move it to another thread. But your future cannot be send between threads safely because it contains non-sendable type (dyn futures::Stream<Item=bytes::bytes::Bytes, Error=actix_web::Error> + 'static
) insideactix_web::client::ClientRequest
type.Probably the easiest way to fix the second error is to use
actix::spawn
function insteadtokio::Runtime::spawn
method: it requires only'static
marker for its argument.After these changes your program compiles at least.