How does one use `sqlx` with `juniper` subscriptions in Rust?

500 views Asked by At

Background:

I am having trouble integrating sqlx with juniper subscriptions.

I am getting a Pin<Box<dyn Stream<Item = Result<User, sqlx::Error>> + 'e + Send>> from sqlx::query::QueryAs::fetch().

juniper needs subscriptions to be returned as Pin<Box<dyn Stream<Item = Result<User, juniper::FieldError>> + Send>>.

Note the change from Result<User, sqlx::Error> to Result<User, juniper::FieldError>. Using map_err() from futures::TryStreamExt, I created the following code to perform the query and transform the error type.

type UsersStream =
    Pin<Box<dyn Stream<Item = Result<User, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Context)]
impl SubscriptionRoot {
    async fn users(context: &Context) -> UsersStream {
        let sqlx::query_as!(User, "SELECT * FROM users")
            .fetch(&context.pool)
            .map_err(|e| {
                FieldError::new(
                    "Database error",
                    graphql_value!(format!("{}", e)))
            })
            .boxed()
    }
}

This fails with the following error on compile:

error[E0759]: `executor` has lifetime `'ref_e` but it needs to satisfy a `'static` lifetime requirement
  --> server/src/graphql/subscription.rs:27:1
   |
27 |   #[juniper::graphql_subscription(Context = Context)]
   |   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |   |
   |   this data with lifetime `'ref_e`...
   |   ...is captured here...
...
63 | /         sqlx::query_as!(User, "SELECT * FROM users")
64 | |             .fetch(&context.pool)
65 | |             .map_err(|e| {
66 | |                 FieldError::new(
...  |
69 | |             })
70 | |             .boxed()
   | |____________________- ...and is required to live as long as `'static` here
   |
   = note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)

error: aborting due to previous error

I am not familiar enough with Streams or lifetimes to understand the implications of this error. After looking into this some more, it seems that ref_e is the lifetime of the subscription's reference to juniper's Executor.

Attempts:

Versions:

  • sqlx-0.4.1
  • juniper pinned to commit cd66bdb on master
2

There are 2 answers

0
Mathieu On

your code is not exactly like mine but I think the solution can apply here too, try to clone the pool before using it:

type UsersStream =
    Pin<Box<dyn Stream<Item = Result<User, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Context)]
impl SubscriptionRoot {
    async fn users(context: &Context) -> UsersStream {
        let pool = context.pool.clone();
        let sqlx::query_as!(User, "SELECT * FROM users")
            .fetch(&pool)
            .map_err(|e| {
                FieldError::new(
                    "Database error",
                    graphql_value!(format!("{}", e)))
            })
            .boxed()
    }
}
0
Sarowar On

The answer by Mathieu is correct. So I'll explain the reason behind the error.

The fundamental issue here is when you're returning UsersStream you're moving data out of the function fn users(..). Now the caller of the function users(..) owns the returned data and (theoretically) can do whatever it wants to do, including keeping the data for the lifetime of the application, i.e. giving the data 'static lifetime. But if UsersStream has a reference to a data (context.pool) with a limited lifetime, what'll happen when the owner of pool drops the data? It'll refer to nullpointer as data is dropped. So the compiler throws errors to prevent such a situation.

So what you can do here is to somehow pass the owned data(pool) instead of reference, ensuring that pool has the same lifetime as UsersStream as it owns the data now. clone() does exactly this, it creates an owned copy(either reference-counted or byte copy) of the data.