An error occurred while receiving data sent from the client. : remote_endpoint: Transport endpoint is not connected

105 views Asked by At

I have used Boost.asio to create a server and client that communicate using tcp method. The server has no problem receiving information from an individual or a small number of clients, but if it receives information from a large number of clients per second (approximately 100 or more), it will subsequently receive an error saying “remote_endpoint: The sending endpoint is not connected” . I would like to fix this error.

This is my code

#include <iostream>
#include <array>
#include <map>
#include <boost/asio.hpp>
#include <mysql/mysql.h>
#include <nlohmann/json.hpp>
#include <chrono>
#include <cmath>

using namespace boost::asio;
using json = nlohmann::json;

class AsyncTCPServer {
public:
    // Constructor
    AsyncTCPServer(io_service& io_service, short port)
        : acceptor_(io_service, ip::tcp::endpoint(ip::tcp::v4(), port)),
        socket_(io_service) {
            StartAccept();
            ConnectToMariaDB();
    }
    // Destructor
    ~AsyncTCPServer() {
        // Disconnect from MariaDB
        if (db_connection_ != nullptr) {
            mysql_close(db_connection_);
            std::cout << "MariaDB connection closed" << std::endl;
        }
    }

private:
    // Start accepting new client connections and initiate asynchronous communication
    void StartAccept() {
        acceptor_.async_accept(socket_,
            [this](boost::system::error_code ec) {
                if(ec) {
                    std::cerr << "Error occurred during connection acceptance: " << ec.message() << std::endl;
                } else {
                    // Add the new client socket to the array for management
                    AddClient(std::make_shared<ip::tcp::socket>(std::move(socket_)));
                    // Wait for the next client connection
                    StartAccept();
                    // Initiate asynchronous communication for the current client
                    StartRead(clients_[num_clients_ - 1]);
                }
            });
    }
 
    // Add a new client to the array and print a connection message
    void AddClient(std::shared_ptr<ip::tcp::socket> client) {
        std::lock_guard<std::mutex> lock(mutex_);
    
        if (num_clients_ < max_clients) {
            clients_[num_clients_] = client;
            num_clients_++;
    
            // Print connection message
            std::cout << client->remote_endpoint().address().to_string() + " connected." << std::endl;
        } else {
            std::cerr << "Cannot accept connection, maximum number of clients exceeded." << std::endl;
            client->close();
        }
    }
 
    // Start asynchronous reading for the client
    void StartRead(std::shared_ptr<ip::tcp::socket> client) {
        auto& buffer = buffers_[client];  // Get the buffer associated with the client
        async_read_until(*client, buffer, '\n',
            [this, client, &buffer](boost::system::error_code ec, std::size_t length) {
                if (ec) {
                    RemoveClient(client);
                } else {
                    std::istream is(&buffer);
                    std::string message;
                    std::getline(is, message);
                    // Save to the database
                    SaveToDatabase(message);
                    StartRead(client);
                }
            });
    }

    // Connect to MariaDB
    void ConnectToMariaDB() {
        // Initialize MariaDB connection
        db_connection_ = mysql_init(nullptr);
        if (db_connection_ == nullptr) {
            std::cerr << "Failed to initialize MariaDB connection" << std::endl;
            exit(1);
        }

        const char* host = "localhost";
        const char* user = "root";
        const char* password = "1234";
        const char* database = "servertest";
        unsigned int port = 3306;

        if (mysql_real_connect(db_connection_, host, user, password, database, port, nullptr, 0) == nullptr) {
            std::cerr << "Failed to connect to MariaDB: " << mysql_error(db_connection_) << std::endl;
            exit(1);
        }

        std::cout << "MariaDB connection established" << std::endl;
    }

    // Save message to the database
    void SaveToDatabase(const std::string& message) {
       // Assume JSON and save to the database
       try {
          auto j = json::parse(message);
          std::lock_guard<std::mutex> lock(mutex_);
            
          if (j.find("CPU") != j.end()) {
             SaveCpuToDatabase(j["CPU"]);
          }
          if (j.find("NIC") != j.end()) {
             SaveNicToDatabase(j["NIC"]);
          }
          if (j.find("Memory") != j.end()) {
             SaveMemoryToDatabase(j["Memory"]);
          }
          if (j.find("Disk") != j.end()) {
             SaveDiskToDatabase(j["Disk"]);
          }
          std::cout << "Saved to the database" << std::endl;
       } catch (const nlohmann::detail::parse_error& e) { // Catch JSON parsing errors
          std::cerr << "JSON parsing error: " << e.what() << std::endl;
          std::cerr << "Error occurred in the message: " << message << std::endl;
       } catch (const std::exception& e) {
          std::cerr << "Failed to save to the database: " << e.what() << std::endl;
          std::cerr << "Error occurred in the message: " << message << std::endl;
       }
    }
 
    void SaveCpuToDatabase(const json& cpuData) {
        for (const auto& processor : cpuData) {
            // Extract information for each processor
            std::string cores = processor["Cores"].get<std::string>();
            std::string model = processor["Model"].get<std::string>();
            std::string siblings = processor["Siblings"].get<std::string>();
            
            // Generate query and save data to the DB
            std::string query = "INSERT INTO cpu_table (cores, model, siblings) VALUES ('" + cores + "', '" + model + "', '" + siblings + "')";
            if (mysql_query(db_connection_, query.c_str()) != 0) {
                std::cerr << "Error occurred while saving CPU information to the database: " << mysql_error(db_connection_) << std::endl;
            }
        }
    }
 
    void SaveNicToDatabase(const json& nicData) {
        for (const auto& nic : nicData) {
            // Extract information for each NIC
            std::string interface = nic["Interface"].get<std::string>();
            std::string mac_address = nic["MAC Address"].get<std::string>();
            std::string operational_state = nic["Operational State"].get<std::string>();
            std::string speed = nic["Speed"].get<std::string>();

            // Generate query and save data to the DB
            std::string query = "INSERT INTO nic_table (interface, mac_address, operational_state, speed) VALUES ('" + interface + "', '" + mac_address + "', '" + operational_state + "', '" + speed + "')";
            int queryResult = mysql_query(db_connection_, query.c_str());
            if (queryResult != 0) {
                std::cerr << "Error occurred while saving NIC information to the database: " << mysql_error(db_connection_) << std::endl;
            }
        }
    }
 
    void SaveMemoryToDatabase(const json& memoryData) {
        // Similar logic for saving memory information to the database
        // ...
    }
 
    void SaveDiskToDatabase(const json& diskData) {
        // Similar logic for saving disk information to the database
        // ...
    }
   
    // Remove client from the array and print a connection termination message
    void RemoveClient(std::shared_ptr<ip::tcp::socket> client) {
        for (int i = 0; i < max_clients; ++i) {
            if (clients_[i] == client) {
                clients_[i] = nullptr;
                num_clients_--;
                // Print connection termination message
                std::cout << client->remote_endpoint().address().to_string() + " connection terminated." << std::endl;

                boost::system::error_code ec;
                client->shutdown(ip::tcp::socket::shutdown_both, ec);
                client->close(ec);

                break;
            }
        }
    }

    ip::tcp::acceptor acceptor_; // TCP acceptor
    ip::tcp::socket socket_; // TCP socket
    static const int max_clients = 100000;  // Maximum number of clients
    std::array<std::shared_ptr<ip::tcp::socket>, max_clients> clients_; // TCP sockets corresponding to the maximum number of clients
    int num_clients_ = 0; // Current number of connected clients
    std::mutex mutex_;
    // MariaDB connection handler
    MYSQL* db_connection_;
    std::map<std::shared_ptr<ip::tcp::socket>, streambuf> buffers_;  // Map to manage buffers for each client
};

int main() {
    std::chrono::system_clock::time_point start = std::chrono::system_clock::now();
    try {
        boost::asio::io_service io_service; // Create io_service object
        AsyncTCPServer server(io_service, 12345); // Create an object of the AsyncTCPServer class (object, port number)
        io_service.run(); // Start the event loop
    } catch (std::exception& e) {
        std::cerr << "Exception caught: " << e.what() << std::endl;
        std::chrono::duration<double> sec = std::chrono::system_clock::now() - start;
        std::cout << "Time taken to run (seconds): " << sec.count() << " seconds" << std::endl;
    }

    return 0;
}
1

There are 1 answers

3
sehe On

The remote_endpoint() accessor raises an error if the socket is no longer connected. This includes when the peer closed the connection.

In general you cannot use it when an operation had terminated with an error condition.

The usual way to preserve the endpoint (e.g. for logging purposes) is to save it when a connection is newly accepted. So instead of

std::array<std::shared_ptr<ip::tcp::socket>, max_clients> clients_;

You might have

struct Client {
    ip::tcp::socket   socket;
    ip::tcp::endpoint remote_endpoint;
};

std::array<std::shared_ptr<Client>, max_clients> clients_;

Refactor

In fact, the way the code is organized seems more typical of C-style select/poll code, instead of typical Asio code. Client would be a perfect candidate to encapsulate all session-specific logic for your server.

Here I present your code refactored to separate concerns between

  • Database which knows how to save messages and owns a connection
  • Server which accepts socket connections and owns a Database instance
  • Session which owns a single client socket and related resources (like buffer), which gets a reference to Database for message processing

In smaller changes

  • it avoids streambuf with istream processing (costly)
  • it uses length explicitly (instead of implicitly through std::getline)
  • it avoids buffer copying in the process
  • it avoids unnecessary locking (your sever is single-threaded, so there is an implicit strand)
  • I chose to keep the locking in Database so it remains thread-aware in case you use it in different places
  • I changed the manual array juggling to emulate a fixed-size container into boost flat_set over a static_vector. This removes several categories of memory management and exception safety bugs
  • Removed the "RemoveClient" which could easily look like
     size_t RemoveClient(std::shared_ptr<Session> const& c) {
         return sessions_.erase(c);
     }
    
    If you wanted
  • Instead we rely on enable_shared_from_this to govern the Session lifetime, meaning that we can remove expired sessions on demand.

Notes:

  • 100'000 connections is LARGE for a stack-allocated object. I'd probably not use a static container at all
  • 100'000 connections is NOT going to scale with a single MYSQL connection. Instead you probably want to queue messages to be consumed by several database workers that concurrently save to a database - preferably on a cluster so the load can be balanced
  • There's blatant SQL injection potential. Consider using Boost Mysql which supports async operations and prepared statements
  • Avoid using detail namespaces

Refactored Listing

  • File Database.h

     #pragma once
     #include <nlohmann/json.hpp>
     #include <string_view>
     using nlohmann::json;
    
     struct Database {
         Database();
         ~Database();
    
         void SaveToDatabase(std::string_view message);
    
       private:
         // Connect to MariaDB
         void ConnectToMariaDB();
         void SaveCpuToDatabase(json const& cpuData);
         void SaveNicToDatabase(json const& nicData);
         void SaveMemoryToDatabase(json const& memoryData);
         void SaveDiskToDatabase(json const& diskData);
    
         // MariaDB connection handle
         std::mutex mutex_;
         struct MYSQL* handle_ = nullptr;
     };
    
  • File Database.cpp

     #include "Database.h"
     #include <iostream>
     #include <mysql/mysql.h>
     using namespace std::string_literals;
    
     Database::Database() { ConnectToMariaDB(); }
     Database::~Database() {
         if (handle_) {
             ::mysql_close(handle_);
             std::cout << "MariaDB connection closed" << std::endl;
         }
     }
    
     // Save message to the database
     void Database::SaveToDatabase(std::string_view message) {
         // Assume JSON and save to the database
         try {
             json            j = json::parse(message);
             std::lock_guard lock(mutex_);
    
             if (auto it = j.find("CPU"); it != j.end())
                 SaveCpuToDatabase(*it);
             if (auto it = j.find("NIC"); it != j.end())
                 SaveNicToDatabase(*it);
             if (auto it = j.find("Memory"); it != j.end())
                 SaveMemoryToDatabase(*it);
             if (auto it = j.find("Disk"); it != j.end())
                 SaveDiskToDatabase(*it);
    
             std::cout << "Saved to the database" << std::endl;
         } catch (nlohmann::json::parse_error const& e) { // Catch JSON parsing errors
             std::cerr << "JSON parsing error: " << e.what() << std::endl;
             std::cerr << "Error occurred in the message: " << message << std::endl;
         } catch (std::exception const& e) {
             std::cerr << "Failed to save to the database: " << e.what() << std::endl;
             std::cerr << "Error occurred in the message: " << message << std::endl;
         }
     }
    
     void Database::ConnectToMariaDB() {
         // Initialize MariaDB connection
         handle_ = ::mysql_init(nullptr);
         if (!handle_)
             throw std::runtime_error("Failed to initialize MariaDB connection");
    
         char const*  host     = "localhost";
         char const*  user     = "root";
         char const*  password = "1234";
         char const*  database = "servertest";
         unsigned int port     = 3306;
    
         if (::mysql_real_connect(handle_, host, user, password, database, port, nullptr, 0) == nullptr)
             throw std::runtime_error("Failed to connect to MariaDB: "s + ::mysql_error(handle_));
    
         std::cout << "MariaDB connection established" << std::endl;
     }
    
     void Database::SaveCpuToDatabase(json const& cpuData) {
         for (auto const& processor : cpuData) {
             // Extract information for each processor
             std::string cores    = processor["Cores"].get<std::string>();
             std::string model    = processor["Model"].get<std::string>();
             std::string siblings = processor["Siblings"].get<std::string>();
    
             // Generate query and save data to the DB
             std::string query = "INSERT INTO cpu_table (cores, model, siblings) VALUES ('" + cores + "', '" +
                 model + "', '" + siblings + "')";
             if (::mysql_query(handle_, query.c_str()) != 0) {
                 std::cerr << "Error occurred while saving CPU information to the database: "
                           << ::mysql_error(handle_) << std::endl;
             }
         }
     }
    
     void Database::SaveNicToDatabase(json const& nicData) {
         for (auto const& nic : nicData) {
             // Extract information for each NIC
             std::string interface         = nic["Interface"].get<std::string>();
             std::string mac_address       = nic["MAC Address"].get<std::string>();
             std::string operational_state = nic["Operational State"].get<std::string>();
             std::string speed             = nic["Speed"].get<std::string>();
    
             // Generate query and save data to the DB
             std::string query =
                 "INSERT INTO nic_table (interface, mac_address, operational_state, speed) VALUES ('" + interface +
                 "', '" + mac_address + "', '" + operational_state + "', '" + speed + "')";
             int queryResult = ::mysql_query(handle_, query.c_str());
             if (queryResult != 0) {
                 std::cerr << "Error occurred while saving NIC information to the database: "
                           << ::mysql_error(handle_) << std::endl;
             }
         }
     }
    
     void Database::SaveMemoryToDatabase(json const& /*memoryData*/) {
         // Similar logic for saving memory information to the database
         // ...
     }
    
     void Database::SaveDiskToDatabase(json const& /*diskData*/) {
         // Similar logic for saving disk information to the database
         // ...
     }
    

Bonus: C++23 and no dependencies

Just because I wanted to do Boost Mysql for once, here's without nlohman OR mysqlclient dependency, using C++20 coroutines, multithreading DB workers and prepared statements to avoid SQL injection: https://coliru.stacked-crooked.com/a/a9276899dcd0a623

  • File Database.h

     #pragma once
     #include <boost/asio/awaitable.hpp>
     #include <boost/asio/experimental/concurrent_channel.hpp>
    
     using DbChannel = boost::asio::experimental::concurrent_channel<void(boost::system::error_code, std::string)>;
     boost::asio::awaitable<void> DbWorker(DbChannel& ch);
    
  • File test.cpp

     #include "Database.h"
    
     #include <boost/asio.hpp>
     #include <iostream>
     #include <list>
     #include <syncstream>
    
     namespace asio = boost::asio;
     using asio::ip::tcp;
     using error_code = boost::system::error_code;
     using Socket     = asio::deferred_t::as_default_on_t<tcp::socket>;
     using Acceptor   = asio::deferred_t::as_default_on_t<tcp::acceptor>;
    
     static auto out() { return std::osyncstream(std::cout); }
     static auto err() { return std::osyncstream(std::cerr); }
    
     asio::awaitable<void> Session(Socket s, DbChannel& db) {
         error_code    ec;
         tcp::endpoint ep = s.remote_endpoint(ec); // save it before the connection becomes broken
         out() << ep << " connection accepted." << std::endl;
         try {
             for (std::string buf;;) {
                 auto n = co_await async_read_until(s, asio::dynamic_buffer(buf), '\n');
                 co_await db.async_send(error_code{}, buf.substr(0, n - 1), asio::deferred);
                 buf.erase(0, n);
             }
         } catch (boost::system::system_error const& se) {
             err() << ep << " session failed: " << se.code().message() << std::endl;
         } catch (std::exception const& e) {
             err() << ep << " session failed: " << e.what() << std::endl;
         }
         out() << ep << " connection terminated." << std::endl;
     }
    
     asio::awaitable<void> Server(short port, DbChannel& db) try {
         auto     ex = co_await asio::this_coro::executor;
         Acceptor acc(ex, tcp::endpoint({}, port));
    
         while (true)
             co_spawn(ex, Session(co_await acc.async_accept(), db), asio::detached);
     } catch (std::exception const& e) {
         err() << "Accept error: " << e.what() << std::endl;
     }
    
     int main() {
         using namespace std::chrono_literals;
         auto now   = std::chrono::system_clock::now;
         auto start = now();
         try {
             asio::thread_pool ioc;
             DbChannel         channel(ioc, 5);
    
             co_spawn(ioc, Server(12345, channel), asio::detached);
    
             for (int i = 0; i < 1; ++i) // multiple workers are fine
                 co_spawn(ioc, DbWorker(channel), asio::detached);
    
             asio::signal_set ss(ioc, SIGINT, SIGTERM);
             ss.async_wait([&ioc](error_code ec, int sig) {
                 if (!ec) {
                     err() << "received signal " << strsignal(sig) << std::endl;
                     ioc.stop();
                 }
             });
    
             ioc.join(); // wait for event loop
         } catch (std::exception const& e) {
             err() << "Exception caught: " << e.what() << std::endl;
         }
         out() << "Time taken to run (seconds): " << (now() - start) / 1.s << " seconds" << std::endl;
         out() << "Bye" << std::endl;
     }
    
  • File Database.cpp

     #include "Database.h"
     #include <boost/asio.hpp>
     #include <boost/json/src.hpp> // for header-only
     #include <boost/mysql.hpp>
     #include <iostream>
     #include <string_view>
     #include <syncstream>
    
     namespace /*file static*/ {
         namespace asio = boost::asio;
         namespace my   = boost::mysql;
         namespace ssl  = asio::ssl;
         using asio::ip::tcp;
         using boost::system::error_code;
         using namespace std::string_literals;
         using json = boost::json::value;
    
         using My = asio::deferred_t::as_default_on_t<my::tcp_ssl_connection>;
    
         auto out() { return std::osyncstream(std::cout); }
         auto err() { return std::osyncstream(std::cerr); }
     } // namespace
    
     asio::awaitable<void> DbWorker(DbChannel& ch) {
         auto ex       = co_await asio::this_coro::executor;
         auto resolver = asio::deferred_t::as_default_on(tcp::resolver(ex));
    
         char const* host     = "localhost";
         char const* user     = "root";
         char const* password = "root"; // "1234";
         char const* database = "test"; // "servertest";
         char const* port     = "3306";
    
         error_code ec;
         auto       with_ec = redirect_error(asio::deferred, ec);
    
         ssl::context ssl_ctx{ssl::context::tls_client};
         My conn{ex, ssl_ctx};
         co_await conn.async_connect(*(co_await resolver.async_resolve(host, port)).begin(),
                                     {user, password, database}, with_ec);
         out() << "MariaDB connection: " << ec.message() << std::endl;
    
         // prepared statements
         auto cpu_stmt = co_await conn.async_prepare_statement(
             "INSERT INTO cpu_table (cores, model, siblings) VALUES (?,?,?)", with_ec);
         out() << "CPU prepared statement: " << ec.message() << std::endl;
         auto nic_stmt = co_await conn.async_prepare_statement("INSERT INTO nic_table (interface, mac_address, "
                                                                "operational_state, speed) VALUES (?,?,?,?)",
                                                                with_ec);
         out() << "NIC prepared statement: " << ec.message() << std::endl;
         ec.clear();
    
         for (;;) {
             if (std::string message = co_await ch.async_receive(with_ec); !ec) {
                 try {
                     auto doc = boost::json::parse(message, ec);
                     auto j   = doc.if_object();
                     if (ec.failed() || !j) {
                         err() << "JSON parsing error: " << ec.message() << std::endl;
                         err() << "Error occurred in the message: " << message << std::endl;
                         ec.clear();
                         continue;
                     }
    
                     auto save = [&](auto name, auto& stmt, auto... keys) -> asio::awaitable<void> {
                         my::results r;
                         if (auto node = j->if_contains(name))
                             for (auto const& el : node->as_array()) {
                                 co_await conn.async_execute(stmt.bind(el.at(keys).as_string()...), r, with_ec);
                                 if (ec)
                                     err() << "Error saving " << name << " information: " << ec.message() << std::endl;
                             }
                     };
    
                     co_await save("CPU", cpu_stmt, "Cores", "Model", "Siblings");
                     co_await save("NIC", nic_stmt, "Interface", "MAC Address", "Operational State", "Speed");
                     if ([[maybe_unused]] auto mem = j->if_contains("Memory")) {
                         // Similar logic for saving memory information to the database
                     }
                     if ([[maybe_unused]] auto disk = j->if_contains("Disk")) {
                         // Similar logic for saving disk information to the database
                     }
    
                     out() << "Saved to the database" << std::endl;
                 } catch (std::exception const& e) {
                     err() << "Failed to save to the database: " << e.what() << std::endl;
                     err() << "Error occurred in the message: " << message << std::endl;
                 }
             } else
                 break;
         }
         co_await conn.async_close(with_ec);
         out() << "MariaDB connection closed: " << ec.message() << std::endl;
     }
    

Interactive demo

enter image description here