Rust & Tokio: How to gracefully shutdown a custom server running in a loop using a custom Signal handler?

138 views Asked by At

I'm building a Tokio server that provides multiple services such as http, gRPC, and more recently handles ZMQ messages. Graceful shutdown for http and gRPC is handled by a custom signal handler that catpures a number of signals in Unix / Windows environments. The signal handler is based on this SO answer:

Rust & Tokio: How to handle more signals than just sigint i.e. sigquit?

Now, I'm sitting over adding a zeroMQ message handler and unlike http/gRPC, the custom shutdown must be handled manually. Therefore, I passed the custom shutdown handler as a paramter, and once it's triggers, it breaks the main loop.

The main idea is quite simple:

impl Server {
    pub async fn run(
        self,
        signal: impl Future<Output=()> + Send + 'static,
    ) -> Result<()> {
        let mut socket = self.socket;

        loop {
            select! {

                _ = signal => {
                    break;
                }

                msg = socket.next() => {
                    if let Some(msg) = msg {
                        println!(
                            "Subscribe: {:?}",
                            msg
                            .expect("Failed to get message")
                            .iter()
                                .map(|item| item.as_str().unwrap_or("invalid text"))
                                .collect::<Vec<&str>>()
                        );
                    }
                },

            }
        }

        Ok(())
    }
} 

Full code as gist: https://gist.github.com/rust-play/868cb8c78d7122aaebfe0f34eead716e

without the message handling and just the signal handler in the main loop, the code compiles and works as expected. When I added the message handling, I get a compiler error:

 use of moved value: `signal`
  --> services/qdgw/src/service.rs:27:21
   |
20 |         signal: impl Future<Output=()> +...
   |         ------ move occurs because `signal` has type `impl Future<Output = ()> + Send + 'static`, which does not implement the `Copy` trait
...
24 |         loop {
   |         ---- inside of this loop
...
27 |                 _ = signal => {
   |                     ^^^^^^ value moved here, in previous iteration of loop
   |
help: consider further restricting this bound
   |
20 |         signal: impl Future<Output=()> + Send + 'static + std::marker::Copy,
   |                                                         +++++++++++++++++++

The error is quite clear, I am using a moved value from a field that doesn't implement copy. However, here is the problem:

There is no way a void future can implement copy anyway, so the recommended workaround doesn't work. At least not to my knowledge.

The signal handler doesn't have to sit inside the loop, but if its outside the loop, then I have to fire another signal to break the loop, and I just cannot really figure out how to solve this.

I don't think this is particularly hard, but I just don't have much experience with Tokio yet to figure this out.

Any help is appreciated.

Thank you!

** Solution **

Once the signal Future was pinned, the code compiles and works as correctly. See full code below. See the pin macro documentation for details.

impl Server {
    pub async fn run(
        self,
        signal: impl Future<Output=()> + Send + 'static,
    ) -> Result<()> {

        let mut socket = self.socket;

        // If it is required to call .await on a &mut _ reference,
        // the caller is responsible for pinning the future.
        // https://docs.rs/tokio/latest/tokio/macro.pin.html#examples
        let future = signal;
        pin!(future);

        loop {
            select! {
                _ = &mut future => {
                    break;
                }

                msg = socket.next() => {
                    if let Some(msg) = msg {
                        println!(
                            "Subscribe: {:?}",
                            msg
                            .expect("Failed to get message")
                            .iter()
                                .map(|item| item.as_str().unwrap_or("invalid text"))
                                .collect::<Vec<&str>>()
                        );
                    }
                },

            }
        }

        Ok(())
    }
}
0

There are 0 answers