How to call async function inside ActorHandler in actix web?

63 views Asked by At

What am I trying to achieve?

Calling async function inside the ActorHandler implementation

Code

so here is the struct:

    pub struct WsConnectionManager {
        sessions: HashMap<String, Socket>,
        redis_client: RedisData, // redis client
    }

I have methods implemented for the WsConnectionManager that accesses the sessions and redis_client and performs certain action. So I defined few methods for WsConnectionManager

    impl WsConnectionManager {
        pub fn new(redis_broadcast_manager: RedisData) -> WsConnectionManager {
            WsConnectionManager {
                sessions: HashMap::new(),
                redis_broadcast_manager,
            }
        }
    
        pub fn send_message(&mut self, message: &str, message_type: String, id_to: &String) {
            if let Some(socket_recipient) = self.sessions.get(id_to) {
                let _ = socket_recipient.do_send(WsMessage {
                    message_type,
                    data: message.to_owned(),
                });
            } else {
                println!("attempting to send message but couldn't find user id.");
            }
        }
    
        pub async fn remove_stream(&mut self, stream_id: &str) {
            self.redis_broadcast_manager
                .remove_value(stream_id)
                .await
                .unwrap();
        }
    
        pub async fn get_stream_viewers(&mut self, stream_id: &str) -> Vec<BroadcastState> {
            let raw_stream_viewers = self
                .redis_broadcast_manager
                .get_value(stream_id)
                .await
                .unwrap();
    
            let stream_viewers: Vec<BroadcastState> =
                serde_json::from_str(raw_stream_viewers.as_ref().unwrap().as_str()).unwrap();
    
            stream_viewers
        }
    
        pub async fn set_stream_viewers(&mut self, stream_id: &str, viewers: Vec<BroadcastState>) {
            let viewers_json = serde_json::to_string(&viewers).unwrap();
            self.redis_broadcast_manager
                .set_value(stream_id, &viewers_json)
                .await
                .unwrap();
        }
    }
    and then I implemented the Actor trait
    impl Actor for WsConnectionManager {
        type Context = Context<Self>;
    }

then implementing the handler trait for different types of Message

    impl Handler<Disconnect> for WsConnectionManager {
        type Result = ();
    
        fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
            let key = format!("{}:{}", msg.user_id, msg.client_id);
            if self.sessions.remove(&key).is_some() {
                // send message to all other users in the room?
                println!("DISCONNECTED: {}", key);
            }
        }
    }
    
    impl Handler<Connect> for WsConnectionManager {
        type Result = ();
    
        fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {
            let key = format!("{}:{}", msg.user_id, msg.client_id);
            self.sessions.insert(key.clone(), msg.addr);
    
            // self.send_message(&format!("your id is {}", key.clone()), &key);
    
            println!("ACTIVE SESSION COUNT: {}", self.sessions.len());
        }
    }
    
    impl Handler<BroadcastEvent> for WsConnectionManager {
        type Result = ();
    
        fn handle(&mut self, msg: BroadcastEvent, ctx: &mut Context<Self>) -> Self::Result {
            let key = format!("player:{}:{}", msg.user_id, msg.client_id);
    
            if msg.data.action == "broadcast_start" {
                self.set_stream_viewers(&key, vec![]).await; <--- error here.
            } else {
                println!("UNKNOWN ACTION: {}", msg.data.action);
            }
        }
    }

since self.set_stream_viewers(&key, vec![]); is async I need to add await but I can't make async fn handle in impl Handler<BroadcastEvent> for WsConnectionManager.

How can I call the async function inside the Handler implementation? Is there any other way/approach to go about this?

What have I tried so far?

  1. moving the self.set_stream_viewers(&key, vec![]).await inside async move but that gives error: lifetime may not live long enough returning this value requires that '1 must outlive 'static which maybe solved by cloning the self but that is not efficient.
  2. Using block_in_place
 block_in_place(|| {
                tokio::runtime::Runtime::new().unwrap().block_on(async {
                    self.set_stream_viewers(&key, vec![]).await;
                    println!("STARTING STREAM: {}", key);
                });
            })

this gives runtime error: can call blocking only when running on the multi-threaded runtime

  1. using actix::spawn
 actix::spawn(async move {
                // remove entry from redis and broadcast end stream to everyone
                self.remove_stream(&key).await;
                println!("STOPPING STREAM: {}", key);
            });
borrowed data escapes outside of method
`self` escapes the method body here

Here's my Cargo.toml:

[dependencies]
actix-web="4.5.1" 
actix-web-actors="4.3.0"
actix = "0.13.3"
uuid = { version = "0.8", features = ["v4", "serde"] }
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
redis = { version = "0.24.0", features = ["tokio-comp"] }
env_logger = "0.11.2"
dotenv = "0.15.0"
actix-cors = "0.7.0"
actix-files = "0.6.5"
tokio = { version = "1", features = ["full"] }

and github repo: https://github.com/souvikinator/ws-server-rust

0

There are 0 answers