I have a sse endpoint made in Axum I want to send an event every second with a snapshot of a table in my Postgres database. My current code for this:
pub async fn payment_sse_handler(
State(mut state): State<AppState>,
Extension(current_user): Extension<CurrentUser>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream =
stream::repeat_with( move || get_balance(&state.db_pool, current_user.user_id.clone()).await) // <-- Error here
.map(Ok)
.throttle(Duration::from_secs(1));
Sse::new(stream).keep_alive(KeepAlive::default())
}
pub async fn get_balance (db_pool: &Pool<Postgres>, user_id: String) -> Event {
let transaction_history =
sqlx::query_as!(
Transaction,
r#"select transaction_id, status, currency, amount, user_id, created_at, processing_fee
from transaction_history
WHERE user_id=$1
ORDER BY created_at
LIMIT 5"#,
user_id
)
.fetch_all(db_pool)
.await;
match transaction_history {
Ok(transaction_history) => {
match serde_json::to_string(&transaction_history) {
Ok(transaction_history_str) => {Event::default().data(transaction_history_str)},
Err(_) => {Event::default().data("Error")}
}
}
Err(_) => Event::default().data("Error"),
}
}
The problem is that this code gives me the error: "await
is only allowed inside async
functions and blocks
only allowed inside async
functions and blocks". I think the reason I get the error is because || creates a new closure that is not async, but when I try to make it async I just get new errors. Does anyone have an idea on how to fix this or am I doing something fundamentally wrong with this approach?
repeat_with()
creates a closure from a synchronous function, not an asynchronous function. You can do that withunfold()
:However, note that this will send responses with delay of 1 second, not send responses every 1 second. For example, if
get_balance()
takes 0.5s to execute, this will send responses every 1.5s. If you don't want that, you can usetokio::time::interval()
and wrap it withIntervalStream
: