How to correctly call async functions in a WebSocket handler in Actix-web

7.3k views Asked by At

I have made some progress with this, using into_actor().spawn(), but I am struggling to access the ctx variable inside the async block.

I'll start with showing a compiling snippet of the web socket handler, then a failing snippet of the handler, then for reference the full code example.

Working snippet:

Focus on the match case Ok(ws::Message::Text(text))

/// Handler for `ws::Message`
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        // process websocket messages
        println!("WS: {:?}", msg);
        match msg {
            Ok(ws::Message::Ping(msg)) => {
                self.hb = Instant::now();
                ctx.pong(&msg);
            }
            Ok(ws::Message::Pong(_)) => {
                self.hb = Instant::now();
            }
            Ok(ws::Message::Text(text)) => {
                let future = async move {
                    let reader = processrunner::run_process(text).await;
                    let mut reader = reader.ok().unwrap();
                    while let Some(line) = reader.next_line().await.unwrap() {
                        // ctx.text(line);
                        println!("line = {}", line);
                    }
                };

                future.into_actor(self).spawn(ctx);
            }
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            Ok(ws::Message::Close(reason)) => {
                ctx.close(reason);
                ctx.stop();
            }
            _ => ctx.stop(),
        }
    }
}

Not working snippet with ctx line uncommented.

/// Handler for `ws::Message`
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        // process websocket messages
        println!("WS: {:?}", msg);
        match msg {
            Ok(ws::Message::Ping(msg)) => {
                self.hb = Instant::now();
                ctx.pong(&msg);
            }
            Ok(ws::Message::Pong(_)) => {
                self.hb = Instant::now();
            }
            Ok(ws::Message::Text(text)) => {
                let future = async move {
                    let reader = processrunner::run_process(text).await;
                    let mut reader = reader.ok().unwrap();
                    while let Some(line) = reader.next_line().await.unwrap() {
                         ctx.text(line);
                        println!("line = {}", line);
                    }
                };

                future.into_actor(self).spawn(ctx);
            }
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            Ok(ws::Message::Close(reason)) => {
                ctx.close(reason);
                ctx.stop();
            }
            _ => ctx.stop(),
        }
    }
}

Full code snippet split over two files.

main.rs

//! Simple echo websocket server.
//! Open `http://localhost:8080/ws/index.html` in browser
//! or [python console client](https://github.com/actix/examples/blob/master/websocket/websocket-client.py)
//! could be used for testing.
mod processrunner;
use std::time::{Duration, Instant};

use actix::prelude::*;
use actix_files as fs;
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;

/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

/// do websocket handshake and start `MyWebSocket` actor
async fn ws_index(r: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
    println!("{:?}", r);
    let res = ws::start(MyWebSocket::new(), &r, stream);
    println!("{:?}", res);
    res
}

/// websocket connection is long running connection, it easier
/// to handle with an actor
struct MyWebSocket {
    /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
    /// otherwise we drop connection.
    hb: Instant,
}

impl Actor for MyWebSocket {
    type Context = ws::WebsocketContext<Self>;

    /// Method is called on actor start. We start the heartbeat process here.
    fn started(&mut self, ctx: &mut Self::Context) {
        self.hb(ctx);
    }
}

/// Handler for `ws::Message`
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        // process websocket messages
        println!("WS: {:?}", msg);
        match msg {
            Ok(ws::Message::Ping(msg)) => {
                self.hb = Instant::now();
                ctx.pong(&msg);
            }
            Ok(ws::Message::Pong(_)) => {
                self.hb = Instant::now();
            }
            Ok(ws::Message::Text(text)) => {
                let future = async move {
                    let reader = processrunner::run_process(text).await;
                    let mut reader = reader.ok().unwrap();
                    while let Some(line) = reader.next_line().await.unwrap() {
                        // ctx.text(line);
                        println!("line = {}", line);
                    }
                };

                future.into_actor(self).spawn(ctx);
            }
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            Ok(ws::Message::Close(reason)) => {
                ctx.close(reason);
                ctx.stop();
            }
            _ => ctx.stop(),
        }
    }
}

impl MyWebSocket {
    fn new() -> Self {
        Self { hb: Instant::now() }
    }

    /// helper method that sends ping to client every second.
    ///
    /// also this method checks heartbeats from client
    fn hb(&self, ctx: &mut <Self as Actor>::Context) {
        ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
            // check client heartbeats
            if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
                // heartbeat timed out
                println!("Websocket Client heartbeat failed, disconnecting!");

                // stop actor
                ctx.stop();

                // don't try to send a ping
                return;
            }

            ctx.ping(b"");
        });
    }
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    std::env::set_var("RUST_LOG", "actix_server=info,actix_web=info");
    env_logger::init();

    HttpServer::new(|| {
        App::new()
            // enable logger
            .wrap(middleware::Logger::default())
            // websocket route
            .service(web::resource("/ws/").route(web::get().to(ws_index)))
            // static files
            .service(fs::Files::new("/", "static/").index_file("index.html"))
    })
    // start http server on 127.0.0.1:8080
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

processrunner.rs

extern crate tokio;
use tokio::io::*;
use tokio::process::Command;

use std::process::Stdio;

//#[tokio::main]
pub async fn run_process(
    text: String,
) -> std::result::Result<
    tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
    Box<dyn std::error::Error>,
> {
    let mut cmd = Command::new(text);
    cmd.stdout(Stdio::piped());

    let mut child = cmd.spawn().expect("failed to spawn command");

    let stdout = child
        .stdout
        .take()
        .expect("child did not have a handle to stdout");

    let lines = BufReader::new(stdout).lines();

    // Ensure the child process is spawned in the runtime so it can
    // make progress on its own while we await for any output.
    tokio::spawn(async {
        let status = child.await.expect("child process encountered an error");

        println!("child status was: {}", status);
    });
    Ok(lines)
}

Error:

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> src/main.rs:57:41
   |
57 |                   let future = async move {
   |  _________________________________________^
58 | |                     let reader = processrunner::run_process(text).await;
59 | |                     let mut reader = reader.ok().unwrap();
60 | |                     while let Some(line) = reader.next_line().await.unwrap() {
...  |
63 | |                     }
64 | |                 };
   | |_________________^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #2 defined on the method body at 45:5...
  --> src/main.rs:45:5
   |
45 | /     fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
46 | |         // process websocket messages
47 | |         println!("WS: {:?}", msg);
48 | |         match msg {
...  |
74 | |         }
75 | |     }
   | |_____^
note: ...so that the types are compatible
  --> src/main.rs:57:41
   |
57 |                   let future = async move {
   |  _________________________________________^
58 | |                     let reader = processrunner::run_process(text).await;
59 | |                     let mut reader = reader.ok().unwrap();
60 | |                     while let Some(line) = reader.next_line().await.unwrap() {
...  |
63 | |                     }
64 | |                 };
   | |_________________^
   = note: expected  `&mut actix_web_actors::ws::WebsocketContext<MyWebSocket>`
              found  `&mut actix_web_actors::ws::WebsocketContext<MyWebSocket>`
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `actix::fut::FutureWrap<impl std::future::Future, MyWebSocket>` will meet its required lifetime bounds
  --> src/main.rs:66:41
   |
66 |                 future.into_actor(self).spawn(ctx);
   |                                         ^^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0495`.

cargo

[package]
name = "removed"
version = "0.1.0"
authors = ["removed"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "0.2", features = ["full"] }
actix = "0.10"
actix-codec = "0.3"
actix-web = "3"
actix-web-actors = "3"
actix-files = "0.3"
awc = "2"
env_logger = "0.7"
futures = "0.3.1"
bytes = "0.5.3"
2

There are 2 answers

9
Njuguna Mureithi On BEST ANSWER

Here are the basics. You may need to do a little work here and there but this works.

use actix::prelude::*;
use tokio::process::Command;
use actix_web::{ web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use tokio::io::{ AsyncBufReadExt};
use actix::AsyncContext;
use tokio::stream::{ StreamExt};

use tokio::io::{BufReader};

use std::process::Stdio;

#[derive(Message)]
#[rtype(result = "Result<(), ()>")]
struct CommandRunner(String);


/// Define HTTP actor
struct MyWs;

impl Actor for MyWs {
    type Context = ws::WebsocketContext<Self>;
}

#[derive(Debug)]
struct Line(String);

impl StreamHandler<Result<Line, ws::ProtocolError>> for MyWs {
    fn handle(
        &mut self,
        msg: Result<Line, ws::ProtocolError>,
        ctx: &mut Self::Context,
    ) {
        match msg {
            Ok(line) => ctx.text(line.0),
            _ => () //Handle errors
        }
    }
}

/// Handler for ws::Message message
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
    fn handle(
        &mut self,
        msg: Result<ws::Message, ws::ProtocolError>,
        ctx: &mut Self::Context,
    ) {
        match msg {
            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
            Ok(ws::Message::Text(text)) => {
                ctx.notify(CommandRunner(text.to_string()));
            },
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            _ => (),
        }
    }
}

impl Handler<CommandRunner> for MyWs {
    type Result = Result<(), ()>;
    fn handle(&mut self, msg: CommandRunner, ctx: &mut Self::Context) -> Self::Result  {
        let mut cmd = Command::new(msg.0);

        // Specify that we want the command's standard output piped back to us.
        // By default, standard input/output/error will be inherited from the
        // current process (for example, this means that standard input will
        // come from the keyboard and standard output/error will go directly to
        // the terminal if this process is invoked from the command line).
        cmd.stdout(Stdio::piped());

        let mut child = cmd.spawn()
            .expect("failed to spawn command");

        let stdout = child.stdout.take()
            .expect("child did not have a handle to stdout");

        let reader = BufReader::new(stdout).lines();
        
        // Ensure the child process is spawned in the runtime so it can
        // make progress on its own while we await for any output.
        let fut = async move {
            let status = child.await
                .expect("child process encountered an error");
    
            println!("child status was: {}", status);
        };
        let fut = actix::fut::wrap_future::<_, Self>(fut);
        ctx.spawn(fut);
        ctx.add_stream(reader.map(|l| Ok(Line(l.expect("Not a line")))));
        Ok(())
    }
}

async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
    let resp = ws::start(MyWs {}, &req, stream);
    println!("{:?}", resp);
    resp
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| App::new().route("/ws/", web::get().to(index)))
        .bind("127.0.0.1:8080")?
        .run()
        .await
}

Running ls Looks like this.

Example

2
Saad Attieh On

So I just understood what was going wrong at the same time that I discovered the accepted answer.

The accepted answer proposes a clean solution but I thought I would pose an alternative view point, the code snippet I propose below makes fewer changes to my original attempt (as shown in the question) in the hope that it will demonstrate my fundamental miss understanding.

The fundamental issue with my code is that I was ignoring the rule that "every actor has its own context". As you see from the compile error in the question, luckily Actix uses the rust compiler to enforce this rule.

Now that I understand that, it looks like the wrong thing I was trying to do is to spawn another actor and have that actor somehow move/copy in the original actor's context, just so it could respond with the process output lines. There is no need to do this of course, because the Actor model is all about letting Actors communicate by messages.

Instead, when spawning a new actor, I should have passed it the address of the original actor, allowing the newly spawned actor to send updates back. The original actor handles these messages (struct Line below) using a handler.

As I said, the accepted answer also does this but using a mapper which looks like a more elegant solution than my loop.

mod processrunner;
use std::time::{Duration, Instant};

use actix::prelude::*;
use actix_files as fs;
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;

/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

/// do websocket handshake and start `MyWebSocket` actor
async fn ws_index(r: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
    println!("{:?}", r);
    let res = ws::start(MyWebSocket::new(), &r, stream);
    println!("{:?}", res);
    res
}

/// websocket connection is long running connection, it easier
/// to handle with an actor
struct MyWebSocket {
    /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
    /// otherwise we drop connection.
    hb: Instant,
}

impl Actor for MyWebSocket {
    type Context = ws::WebsocketContext<Self>;

    /// Method is called on actor start. We start the heartbeat process here.
    fn started(&mut self, ctx: &mut Self::Context) {
        self.hb(ctx);
    }
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct Line {
    line: String,
}
impl Handler<Line> for MyWebSocket {
    type Result = ();

    fn handle(&mut self, msg: Line, ctx: &mut Self::Context) {
        ctx.text(msg.line);
    }
}

/// Handler for `ws::Message`
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        // process websocket messages
        println!("WS: {:?}", msg);
        match msg {
            Ok(ws::Message::Ping(msg)) => {
                self.hb = Instant::now();
                ctx.pong(&msg);
            }
            Ok(ws::Message::Pong(_)) => {
                self.hb = Instant::now();
            }
            Ok(ws::Message::Text(text)) => {
                let recipient = ctx.address().recipient();
                let future = async move {
                    let reader = processrunner::run_process(text).await;
                    let mut reader = reader.ok().unwrap();
                    while let Some(line) = reader.next_line().await.unwrap() {
                        println!("line = {}", line);
                        recipient.do_send(Line { line });
                    }
                };

                future.into_actor(self).spawn(ctx);
            }
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            Ok(ws::Message::Close(reason)) => {
                ctx.close(reason);
                ctx.stop();
            }
            _ => ctx.stop(),
        }
    }
}

impl MyWebSocket {
    fn new() -> Self {
        Self { hb: Instant::now() }
    }

    /// helper method that sends ping to client every second.
    ///
    /// also this method checks heartbeats from client
    fn hb(&self, ctx: &mut <Self as Actor>::Context) {
        ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
            // check client heartbeats
            if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
                // heartbeat timed out
                println!("Websocket Client heartbeat failed, disconnecting!");

                // stop actor
                ctx.stop();

                // don't try to send a ping
                return;
            }

            ctx.ping(b"");
        });
    }
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    std::env::set_var("RUST_LOG", "actix_server=info,actix_web=info");
    env_logger::init();

    HttpServer::new(|| {
        App::new()
            // enable logger
            .wrap(middleware::Logger::default())
            // websocket route
            .service(web::resource("/ws/").route(web::get().to(ws_index)))
            // static files
            .service(fs::Files::new("/", "static/").index_file("index.html"))
    })
    // start http server on 127.0.0.1:8080
    .bind("127.0.0.1:8080")?
    .run()
    .await
}