I have used Boost.asio to create a server and client that communicate using tcp method. The server has no problem receiving information from an individual or a small number of clients, but if it receives information from a large number of clients per second (approximately 100 or more), it will subsequently receive an error saying “remote_endpoint: The sending endpoint is not connected” . I would like to fix this error.
This is my code
#include <iostream>
#include <array>
#include <map>
#include <boost/asio.hpp>
#include <mysql/mysql.h>
#include <nlohmann/json.hpp>
#include <chrono>
#include <cmath>
using namespace boost::asio;
using json = nlohmann::json;
class AsyncTCPServer {
public:
// Constructor
AsyncTCPServer(io_service& io_service, short port)
: acceptor_(io_service, ip::tcp::endpoint(ip::tcp::v4(), port)),
socket_(io_service) {
StartAccept();
ConnectToMariaDB();
}
// Destructor
~AsyncTCPServer() {
// Disconnect from MariaDB
if (db_connection_ != nullptr) {
mysql_close(db_connection_);
std::cout << "MariaDB connection closed" << std::endl;
}
}
private:
// Start accepting new client connections and initiate asynchronous communication
void StartAccept() {
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec) {
if(ec) {
std::cerr << "Error occurred during connection acceptance: " << ec.message() << std::endl;
} else {
// Add the new client socket to the array for management
AddClient(std::make_shared<ip::tcp::socket>(std::move(socket_)));
// Wait for the next client connection
StartAccept();
// Initiate asynchronous communication for the current client
StartRead(clients_[num_clients_ - 1]);
}
});
}
// Add a new client to the array and print a connection message
void AddClient(std::shared_ptr<ip::tcp::socket> client) {
std::lock_guard<std::mutex> lock(mutex_);
if (num_clients_ < max_clients) {
clients_[num_clients_] = client;
num_clients_++;
// Print connection message
std::cout << client->remote_endpoint().address().to_string() + " connected." << std::endl;
} else {
std::cerr << "Cannot accept connection, maximum number of clients exceeded." << std::endl;
client->close();
}
}
// Start asynchronous reading for the client
void StartRead(std::shared_ptr<ip::tcp::socket> client) {
auto& buffer = buffers_[client]; // Get the buffer associated with the client
async_read_until(*client, buffer, '\n',
[this, client, &buffer](boost::system::error_code ec, std::size_t length) {
if (ec) {
RemoveClient(client);
} else {
std::istream is(&buffer);
std::string message;
std::getline(is, message);
// Save to the database
SaveToDatabase(message);
StartRead(client);
}
});
}
// Connect to MariaDB
void ConnectToMariaDB() {
// Initialize MariaDB connection
db_connection_ = mysql_init(nullptr);
if (db_connection_ == nullptr) {
std::cerr << "Failed to initialize MariaDB connection" << std::endl;
exit(1);
}
const char* host = "localhost";
const char* user = "root";
const char* password = "1234";
const char* database = "servertest";
unsigned int port = 3306;
if (mysql_real_connect(db_connection_, host, user, password, database, port, nullptr, 0) == nullptr) {
std::cerr << "Failed to connect to MariaDB: " << mysql_error(db_connection_) << std::endl;
exit(1);
}
std::cout << "MariaDB connection established" << std::endl;
}
// Save message to the database
void SaveToDatabase(const std::string& message) {
// Assume JSON and save to the database
try {
auto j = json::parse(message);
std::lock_guard<std::mutex> lock(mutex_);
if (j.find("CPU") != j.end()) {
SaveCpuToDatabase(j["CPU"]);
}
if (j.find("NIC") != j.end()) {
SaveNicToDatabase(j["NIC"]);
}
if (j.find("Memory") != j.end()) {
SaveMemoryToDatabase(j["Memory"]);
}
if (j.find("Disk") != j.end()) {
SaveDiskToDatabase(j["Disk"]);
}
std::cout << "Saved to the database" << std::endl;
} catch (const nlohmann::detail::parse_error& e) { // Catch JSON parsing errors
std::cerr << "JSON parsing error: " << e.what() << std::endl;
std::cerr << "Error occurred in the message: " << message << std::endl;
} catch (const std::exception& e) {
std::cerr << "Failed to save to the database: " << e.what() << std::endl;
std::cerr << "Error occurred in the message: " << message << std::endl;
}
}
void SaveCpuToDatabase(const json& cpuData) {
for (const auto& processor : cpuData) {
// Extract information for each processor
std::string cores = processor["Cores"].get<std::string>();
std::string model = processor["Model"].get<std::string>();
std::string siblings = processor["Siblings"].get<std::string>();
// Generate query and save data to the DB
std::string query = "INSERT INTO cpu_table (cores, model, siblings) VALUES ('" + cores + "', '" + model + "', '" + siblings + "')";
if (mysql_query(db_connection_, query.c_str()) != 0) {
std::cerr << "Error occurred while saving CPU information to the database: " << mysql_error(db_connection_) << std::endl;
}
}
}
void SaveNicToDatabase(const json& nicData) {
for (const auto& nic : nicData) {
// Extract information for each NIC
std::string interface = nic["Interface"].get<std::string>();
std::string mac_address = nic["MAC Address"].get<std::string>();
std::string operational_state = nic["Operational State"].get<std::string>();
std::string speed = nic["Speed"].get<std::string>();
// Generate query and save data to the DB
std::string query = "INSERT INTO nic_table (interface, mac_address, operational_state, speed) VALUES ('" + interface + "', '" + mac_address + "', '" + operational_state + "', '" + speed + "')";
int queryResult = mysql_query(db_connection_, query.c_str());
if (queryResult != 0) {
std::cerr << "Error occurred while saving NIC information to the database: " << mysql_error(db_connection_) << std::endl;
}
}
}
void SaveMemoryToDatabase(const json& memoryData) {
// Similar logic for saving memory information to the database
// ...
}
void SaveDiskToDatabase(const json& diskData) {
// Similar logic for saving disk information to the database
// ...
}
// Remove client from the array and print a connection termination message
void RemoveClient(std::shared_ptr<ip::tcp::socket> client) {
for (int i = 0; i < max_clients; ++i) {
if (clients_[i] == client) {
clients_[i] = nullptr;
num_clients_--;
// Print connection termination message
std::cout << client->remote_endpoint().address().to_string() + " connection terminated." << std::endl;
boost::system::error_code ec;
client->shutdown(ip::tcp::socket::shutdown_both, ec);
client->close(ec);
break;
}
}
}
ip::tcp::acceptor acceptor_; // TCP acceptor
ip::tcp::socket socket_; // TCP socket
static const int max_clients = 100000; // Maximum number of clients
std::array<std::shared_ptr<ip::tcp::socket>, max_clients> clients_; // TCP sockets corresponding to the maximum number of clients
int num_clients_ = 0; // Current number of connected clients
std::mutex mutex_;
// MariaDB connection handler
MYSQL* db_connection_;
std::map<std::shared_ptr<ip::tcp::socket>, streambuf> buffers_; // Map to manage buffers for each client
};
int main() {
std::chrono::system_clock::time_point start = std::chrono::system_clock::now();
try {
boost::asio::io_service io_service; // Create io_service object
AsyncTCPServer server(io_service, 12345); // Create an object of the AsyncTCPServer class (object, port number)
io_service.run(); // Start the event loop
} catch (std::exception& e) {
std::cerr << "Exception caught: " << e.what() << std::endl;
std::chrono::duration<double> sec = std::chrono::system_clock::now() - start;
std::cout << "Time taken to run (seconds): " << sec.count() << " seconds" << std::endl;
}
return 0;
}
The
remote_endpoint()
accessor raises an error if the socket is no longer connected. This includes when the peer closed the connection.In general you cannot use it when an operation had terminated with an error condition.
The usual way to preserve the endpoint (e.g. for logging purposes) is to save it when a connection is newly accepted. So instead of
You might have
Refactor
In fact, the way the code is organized seems more typical of C-style select/poll code, instead of typical Asio code.
Client
would be a perfect candidate to encapsulate all session-specific logic for your server.Here I present your code refactored to separate concerns between
Database
which knows how to save messages and owns a connectionServer
which accepts socket connections and owns aDatabase
instanceSession
which owns a single client socket and related resources (like buffer), which gets a reference toDatabase
for message processingIn smaller changes
streambuf
withistream
processing (costly)length
explicitly (instead of implicitly throughstd::getline
)Database
so it remains thread-aware in case you use it in different placesflat_set
over astatic_vector
. This removes several categories of memory management and exception safety bugsenable_shared_from_this
to govern theSession
lifetime, meaning that we can remove expired sessions on demand.Notes:
MYSQL
connection. Instead you probably want to queue messages to be consumed by several database workers that concurrently save to a database - preferably on a cluster so the load can be balanceddetail
namespacesRefactored Listing
File
Database.h
File
Database.cpp
Bonus: C++23 and no dependencies
Just because I wanted to do Boost Mysql for once, here's without nlohman OR mysqlclient dependency, using C++20 coroutines, multithreading DB workers and prepared statements to avoid SQL injection: https://coliru.stacked-crooked.com/a/a9276899dcd0a623
File
Database.h
File
test.cpp
File
Database.cpp
Interactive demo