How to find if tokio::sync::mpsc::Receiver has been closed?

3.3k views Asked by At

I have a loop where I do some work and send result with Sender. The work takes time and I need to retry it in case of failure. It's possible that while I retry it, the receiver has been closed and my retries are going to be a waste of time. Because of this, I need a way to check if Receiver is available without sending a message.

In an ideal world, I want my code to look like this in pseudocode:

let (tx, rx) = tokio::sync::mpsc::channel(1);

tokio::spawn(async move {
   // do som stuff with rx and drop it after some time
    rx.recv(...).await;
});

let mut attempts = 0;
loop {
    if tx.is_closed() {
       break;
    }
    if let Ok(result) = do_work().await {
        attempts = 0;
        let _ = tx.send(result).await;
    } else {
        if attempts >= 10 {
            break;
        } else {
            attempts += 1;
            continue;
        }
    }
};

The problem is that Sender doesn't have an is_closed method. It does have pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>, but I don't know what Context is or where can I find it.

When I don't have a value to send, how can I check if the sender is able to send?

2

There are 2 answers

2
justinas On BEST ANSWER

Sender has a try_send method:

Attempts to immediately send a message on this Sender

This method differs from send by returning immediately if the channel's buffer is full or no receiver is waiting to acquire some data. Compared with send, this function has two failure cases instead of one (one for disconnection, one for a full buffer).

Use it instead of send and check for the error:

if let Err(TrySendError::Closed(_)) = tx.send(result).await {
    break;
}

It is possible to do what you want by using poll_fn from futures crate. It adapts a function returning Poll to return a Future

use futures::future::poll_fn; // 0.3.5
use std::future::Future;
use tokio::sync::mpsc::{channel, error::ClosedError, Sender}; // 0.2.22
use tokio::time::delay_for; // 0.2.22

fn wait_until_ready<'a, T>(
    sender: &'a mut Sender<T>,
) -> impl Future<Output = Result<(), ClosedError>> + 'a {
    poll_fn(move |cx| sender.poll_ready(cx))
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = channel::<i32>(1);

    tokio::spawn(async move {
        // Receive one value and close the channel;
        let val = rx.recv().await;
        println!("{:?}", val);
    });

    wait_until_ready(&mut tx).await.unwrap();
    tx.send(123).await.unwrap();

    wait_until_ready(&mut tx).await.unwrap();
    delay_for(std::time::Duration::from_secs(1)).await;
    tx.send(456).await.unwrap(); // 456 likely never printed out,
                                 // despite having a positive readiness response
                                 // and the send "succeeding"
}

Note, however, that in the general case this is susceptible to TOCTOU. Even though Sender's poll_ready reserves a slot in the channel for later usage, it is possible that the receiving end is closed between the readiness check and the actual send. I tried to indicate this in the code.

1
John Kugelman On

Send a null message that the receiver ignores. It could be anything. For example, if you're sending T now you could change it to Option<T> and have the receiver ignore Nones.

Yeah, that will work, although I don't really liked this approach since I need to change communication format.

I wouldn't get hung up on the communication format. This isn't a well-defined network protocol that should be isolated from implementation details; it's an internal communication mechanism between two pieces of your own code.