How to take ownership of a Mutex of a tokio_postgres::Client with an async task?

770 views Asked by At

I want to create a function that returns a tokio_postgres client. However, I can't find a solution to take ownership of variable (a database connection from the library tokio_postgres) in an async task (to connect to the database).

Here is my code (Playground):

use std::sync::{Arc, Mutex};
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::{Client, Connection, Error, NoTls, Socket};

#[tokio::main] // By default, tokio_postgres uses the tokio crate as its runtime.
async fn main() -> Result<(), Error> {
    let pg_client = create_pg_client("postgres://postgres:root@localhost:5432");
    // Use pg_client for the program
    Ok(())
}

//
// Creates a pg client
//
pub async fn create_pg_client(
    config: &str,
) -> Result<Arc<Mutex<(Client, Connection<Socket, NoTlsStream>)>>, Error> {
    // Connect to the database.
    let mut connect_result = Arc::new(Mutex::new(tokio_postgres::connect(config, NoTls).await?));

    let connect_result_thread = connect_result.clone();
    // The connection object performs the actual communication with the database,
    // so spawn it off to run on its own.
    tokio::spawn(async move {
        let mut result = connect_result_thread.lock().unwrap();
        if let Err(e) = (&mut result.1).await {
            eprintln!("An error occured while trying to connect to the database");
        }
    });

    Ok(connect_result)
}

My code doesn't compile:

error: future cannot be sent between threads safely
   --> src\pg_client.rs:18:5
    |
18  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: src\github.com-1ecc6299db9ec823\tokio-1.5.0\src\task\spawn.rs:129:21
    |
129 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::spawn`
    |
    = help: within `impl Future`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, (Client, tokio_postgres::Connection<Socket, NoTlsStream>)>`
note: future is not `Send` as this value is used across an await
   --> src\pg_client.rs:20:25
    |
19  |         let mut result = connect_result_thread.lock().unwrap();
    |             ---------- has type `std::sync::MutexGuard<'_, (Client, tokio_postgres::Connection<Socket, NoTlsStream>)>` which is not `Send`
20  |         if let Err(e) = (*result).1.await {
    |                         ^^^^^^^^^^^^^^^^^ await occurs here, with `mut result` maybe used later
...
23  |     });
    |     - `mut result` is later dropped here

It says that future cannot be sent between threads safely. Is it possible to achieve what I want?

Crates used :

tokio = { version = "1.5.0", features = ["full"]}
tokio-postgres = "0.7.2"
1

There are 1 answers

2
user4815162342 On BEST ANSWER

The standard library Mutex is designed to hold a lock within the context of a single thread. Since the task can be picked up by a different thread, values that cross await points must be Send. Additionally, even if it did work, using a blocking mutex in an async program is a bad idea because acquiring the mutex can take arbitrarily long, during which time other tasks cannot run on the same executor thread.

You can fix both issues by switching to tokio's mutex whose guard is Send and whose lock() method is async. For example, this compiles:

// Pick up an async aware Mutex
use tokio::sync::Mutex;

let connect_result = Arc::new(Mutex::new(tokio_postgres::connect(config, NoTls).await?));
let connect_result_thread = connect_result.clone();
tokio::spawn(async move {
    let mut result = connect_result_thread.lock().await;
    if let Err(e) = (&mut result.1).await {
        eprintln!("An error occured while trying to connect to the database: {}", e);
    }
});

Ok(connect_result)

Playground

What remains unclear is the ultimate goal of this design. If the connection is behind a mutex, you are effectively serializing access to it from various tasks. Such a connection probably shouldn't be shared among tasks in the first place.