I'm building a gRPC server using Rust and tonic and having some issues with a function that returns the stream. So far, the only examples I've seen conveniently create the tx and rx channels within the function - but this isn't a lot of help if you need to receive data from some other part of the application. I have the following code, but I'm getting an error.

Code

use std::sync::Arc;
use std::sync::Mutex;
use futures::{Stream, StreamExt};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Server;
use tonic::{Request, Response, Status};use resourcemanager::{LineRequest, LineResponse, Position};
use resourcemanager::resource_manager_server::{ResourceManager, ResourceManagerServer};

pub mod resourcemanager {
    tonic::include_proto!("resourcemanager");
}

#[derive(Debug)]
pub struct ResourceManagerService {
    linear_rx: mpsc::Receiver<Position>,
    linear_tx: mpsc::Sender<Position>
}

#[tonic::async_trait]
impl ResourceManager for ResourceManagerService {
    async fn draw_line(&self, request: Request<LineRequest>) -> Result<Response<LineResponse>, Status> {
        Ok(Response::new(LineResponse::default()))
    }

    type StreamLinearMotorMovementStream = ReceiverStream<Result<Position, Status>>;

    async fn stream_linear_motor_movement(
        &self,
        request: Request<()>
    ) -> Result<Response<Self::StreamLinearMotorMovementStream>, Status> {
        println!("Streaming motor movements");
        let (tx, mut rx) = mpsc::channel(1);

        tokio::spawn(async move {
            while let Some(received) = self.linear_rx.recv().await {
                tx.send(Ok(received.clone())).await.unwrap();
            }
        });

       Ok(Response::new(ReceiverStream::new(rx)))
    }
}

fn main() {
    println!("Hello, world!");
}

Error

error[E0759]: `self` has lifetime `'life0` but it needs to satisfy a `'static` lifetime requirement
   --> src/main.rs:30:10
    |
30  |         &self,
    |          ^^^^ this data with lifetime `'life0`...
...
36  |         tokio::spawn(async move {
    |         ------------ ...is used and required to live as long as `'static` here
    |
note: `'static` lifetime requirement introduced by this bound
   --> /Users/xxxxxxxx/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.18.2/src/task/spawn.rs:127:28
    |
127 |         T: Future + Send + 'static,
    |                            ^^^^^^^

This error is shown underneath &self in:

async fn stream_linear_motor_movement(
        &self,
        request: Request<()>
    )
1

There are 1 answers

1
user2722968 On BEST ANSWER

The error message basically says it all. Abbreviated:

async fn stream_linear_motor_movement(&self) {
    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        while let Some(received) = self.linear_rx.recv().await {}
    });
}

The fragment self.linear_rx.recv().await inside the newly spawned task forces the compiler to move self into that closure, so the closure can access self.linear_rx. However, as the new task could run forever, it requires its captured context to have a lifetime of 'static, while &self has a limited, possibly shorter than 'static, lifetime of life0 (whatever that turns out to be). What that means is that you can't access self (or anything derived from it) from within the newly spawned task, because there is no guarantee that it will be around while the task is executing.

What you can do is to move linear_rx in ResourceManagerService into an Arc, .clone() that Arc in stream_linear_motor_movement and move that clone into the closure. Depending on what you are trying to accomplish, you can also move linear_rx into an Option and .take() that Option in stream_linear_motor_movement, leaving None in its place. In both cases, you transfer an owned object into the newly spawned task, which has no lifetime shorter than 'static. Notice that Arc would allow one to call stream_linear_motor_movement many times, while Option would only allow one to call it exactly once (because linear_rx is moved away on the first call).