I'm experiencing "connection reset by peer" errors when making multiple concurrent requests to my Rust TCP server. The server seems to handle some requests successfully, but others result in the connection being reset unexpectedly.
Here's my server code:
use crate::request::Request;
use crate::response::Response;
use crate::router::Router;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
pub struct Server {
address: String,
router: Arc<Router>,
}
impl Server {
pub fn new(address: String, router: Router) -> Self {
Server {
address,
router: Arc::new(router),
}
}
pub async fn run(self) {
let listener = TcpListener::bind(&self.address).await.expect("Failed to bind");
println!("Server listening on {}", self.address);
loop {
let (mut stream, _) = match listener.accept().await {
Ok(connection) => connection,
Err(e) => {
eprintln!("Connection failed: {}", e);
continue;
}
};
let router = Arc::clone(&self.router);
tokio::spawn(async move {
let mut buffer = Vec::new();
let mut read_buffer = [0; 1024];
let mut headers_ended = false;
while let Ok(n) = stream.read(&mut read_buffer).await {
if n == 0 { break; }
buffer.extend_from_slice(&read_buffer[..n]);
// Check if we've encountered the end of the headers section
if buffer.windows(4).any(|window| window == b"\r\n\r\n") {
headers_ended = true;
break;
}
}
if !headers_ended {
eprintln!("Failed to read the complete headers.");
return;
}
let request_str = String::from_utf8_lossy(&buffer);
// Split the request string into headers and body parts
let mut parts = request_str.split("\r\n\r\n");
let headers_part = parts.next().unwrap_or_default();
let body_part = parts.next().unwrap_or_default();
let request = Request::new(headers_part.to_string(), body_part.to_string());
let response = router.handle(request).await;
let _ = stream.write(response.to_string().as_bytes()).await;
let _ = stream.flush().await;
let _ = stream.shutdown().await;
});
}
}
}
And here's the code I used to test the server with concurrent requests:
use reqwest::Client;
use serde_json::json;
use tokio::task::JoinSet;
#[tokio::main]
async fn main() {
let client = Client::new();
let send_req = |client: Client| async move {
client.post("http://127.0.0.1:8080/api/create/user").json(&json!({
"name": "John",
"age": 30
})).send().await?.text().await
};
let mut set = JoinSet::new();
for _ in 0..256 {
set.spawn(send_req(client.clone()));
}
while let Some(x) = set.join_next().await {
println!("{x:?}")
}
}
I suspect the issue might be related to how I'm handling the requests and connections in the server code, but I'm not sure what exactly is causing the problem.
Here are a few things I've considered:
Buffer Handling: I'm using a fixed-size buffer of 1024 bytes to read the request data. Could this be causing issues if the request is larger than the buffer size?
Connection Handling: After handling a request, I'm immediately attempting to shutdown the connection using
stream.shutdown().await. Is this the correct approach, or should I keep the connection open for a short duration to allow the client to properly close it?
I've tried increasing the buffer size and adding error handling, but the issue still persists. I'm not sure what else I'm missing.