Programmatically create and listen for multiple signals and a ticking interval with tokio

842 views Asked by At

I'm trying to programmatically listen to multiple signals and a ticking interval. To clear things up, here is the code I have currently:

use std::time::Duration;

use tokio::signal::unix::SignalKind;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut interval = tokio::time::interval(Duration::from_secs(5));

    let sigrtmin = libc::SIGRTMIN();
    let program_name = std::env::args().next().unwrap();
    println!(
        "Try executing this command:\nkill -{} $(pidof {})\n",
        sigrtmin + 2,
        program_name
    );

    let mut signal0 = tokio::signal::unix::signal(SignalKind::from_raw(sigrtmin))?;
    let mut signal1 = tokio::signal::unix::signal(SignalKind::from_raw(sigrtmin + 1))?;
    let mut signal2 = tokio::signal::unix::signal(SignalKind::from_raw(sigrtmin + 2))?;
    let mut signal3 = tokio::signal::unix::signal(SignalKind::from_raw(sigrtmin + 3))?;

    loop {
        tokio::select! {
            _ = signal0.recv() => {
                println!("Got signal 0.");
            }
            _ = signal1.recv() => {
                println!("Got signal 1.");
            }
            _ = signal2.recv() => {
                println!("Got signal 2.");
            }
            _ = signal3.recv() => {
                println!("Got signal 3.");
            }
            _ = interval.tick() => {
                println!("Tick.");
            }
        }
    }
}

So far, so good. But I just can't figure out how to programmatically (e.g. with a loop or a .collect()) create new signals and listen for them in the select!.

How would I go about doing that?

1

There are 1 answers

0
Caesar On

In this kind of situation, I tend to reach for channels, because

  • I know they're cancel-safe and will resume with any unreceived element.
  • They're easy to use. Just create a sender per event you want to watch for, and use timeout on the receiving end.

That being said, they're probably not the most elegant or efficient solution. A clean solution would probably use futures::stream::select_all and tokio_stream::wrappers::SignalStream, see comments on the question. But that's a bit more difficult to understand and set up.

Anyway, the following might work:

use std::time::Duration;
use libc::{SIGUSR1, SIGUSR2};
use tokio::{
    signal::unix::SignalKind,
    sync::mpsc,
    time::{timeout_at, Instant},
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (send, mut recv) = mpsc::channel::<i32>(42);
    for &signum in [SIGUSR1, SIGUSR2].iter() {
        let send = send.clone();
        let mut sig = tokio::signal::unix::signal(SignalKind::from_raw(signum))?;
        tokio::spawn(async move {
            loop {
                sig.recv().await;
                if send.send(signum).await.is_err() {
                    break;
                };
            }
        });
    }

    let mut tick = Instant::now();
    loop {
        match timeout_at(tick, recv.recv()).await {
            Ok(Some(id)) => println!("Signal {id}"),
            Err(_elapsed) => {
                println!("Tick!");
                tick += Duration::from_secs(5);
            }
            Ok(None) => unreachable!(),
        }
    }
}

If you want to treat more different kinds of events, you'll have to define yourself an enum to be sent over the channel.