Can't get async closure to work with Warp::Filter

1.8k views Asked by At

I am trying to get an async closure working in the and_then filter from Warp.

This is the smallest example I could come up with where I am reasonably sure I didn't leave any important details out:

use std::{convert::Infallible, sync::Arc, thread, time};
use tokio::sync::RwLock;
use warp::Filter;

fn main() {
    let man = Manifest::new();

    let check = warp::path("updates").and_then(|| async move { GetAvailableBinaries(&man).await });
}

async fn GetAvailableBinaries(man: &Manifest) -> Result<impl warp::Reply, Infallible> {
    Ok(warp::reply::json(&man.GetAvailableBinaries().await))
}

pub struct Manifest {
    binaries: Arc<RwLock<Vec<i32>>>,
}

impl Manifest {
    pub fn new() -> Manifest {
        let bins = Arc::new(RwLock::new(Vec::new()));

        thread::spawn(move || async move {
            loop {
                thread::sleep(time::Duration::from_millis(10000));
            }
        });

        Manifest { binaries: bins }
    }

    pub async fn GetAvailableBinaries(&self) -> Vec<i32> {
        self.binaries.read().await.to_vec()
    }
}

I am using:

[dependencies]
tokio = { version = "0.2", features = ["full"] }
warp = { version = "0.2", features = ["tls"] }

The error is:

error[E0525]: expected a closure that implements the `Fn` trait, but this closure only implements `FnOnce`
 --> src/main.rs:9:48
  |
9 |     let check = warp::path("updates").and_then(|| async move { GetAvailableBinaries(&man).await });
  |                                       -------- ^^^^^^^^^^^^^ ------------------------------------ closure is `FnOnce` because it moves the variable `man` out of its environment
  |                                       |        |
  |                                       |        this closure implements `FnOnce`, not `Fn`
  |                                       the requirement to implement `Fn` derives from here
2

There are 2 answers

5
Shepmaster On

After making Manifest implement Clone, you can fix the error by balancing when the manifest object is cloned:

fn main() {
    let man = Manifest::new();

    let check = warp::path("updates").and_then(move || {
        let man = man.clone();
        async move { get_available_binaries(&man).await }
    });

    warp::serve(check);
}

This moves man into the closure passed to and_then, then provides a clone of man to the async block each time the closure is executed. The async block then owns that data and can take a reference to it without worrying about executing the future after the data has been deallocated.

1
AdaShoelace On

I'm not sure this is what you're going for, but this solution builds for me:

use std::{convert::Infallible, sync::Arc, thread, time};
use tokio::sync::RwLock;
use warp::Filter;

fn main() {
    let man = Manifest::new();

    let check = warp::path("updates").and_then(|| async { GetAvailableBinaries(&man).await });
}

async fn GetAvailableBinaries(man: &Manifest) -> Result<impl warp::Reply, Infallible> {
    Ok(warp::reply::json(&man.GetAvailableBinaries().await))
}

#[derive(Clone)]
pub struct Manifest {
    binaries: Arc<RwLock<Vec<i32>>>,
}

impl Manifest {
    pub fn new() -> Manifest {
        let bins = Arc::new(RwLock::new(Vec::new()));

        thread::spawn(move || async {
            loop {
                thread::sleep(time::Duration::from_millis(10000));
                //mutate bins here
            }
        });

        Manifest { binaries: bins }
    }

    pub async fn GetAvailableBinaries(&self) -> Vec<i32> {
        self.binaries.read().await.to_vec()
    }
}

The move here is the reason the compiler gave a warning regarding the signature: let check = warp::path("updates").and_then(|| async move { GetAvailableBinaries(&man).await });. This means that everything referenced in this closure will be moved into the context of the closure. In this case, the compiler can't guarantee the closure to be Fn but only FnOnce meaning that the closure can only be guaranteed to execute once.