I am working on a C++ client/server application with standalone asio without boost. I want the client application to continuously get responses from the server and send requests to the server when a client is trying to do so. However, I do not know where exactly I should put asio::io_context::run in my code to make it work as long as the application works. In my application I also use Dear ImGui library for the GUI-related stuff, which is run in a separate thread. ASIO is running in the main thread. I have noticed that some people run io_context in another thread right after calling asio::async_connect which invokes the first async_read operation to prime ASIO with some work to do to avoid premature stops. In my code I tried the same technique but it still exits prematurely. I am including a part of my code with my main function and a client class. In this program, I want to first send the size of my request with a specified delimeter and then it sends the request itself, then the server reads the request size using asio::async_read_until until the specified delimeter and prepares the request buffer with the given size to read the request itself. Then the server also sends response size and the response itself in the same way.
class TCPClient
{
public:
TCPClient(std::string& IP, std::string& PORT);
~TCPClient();
void push_request(std::string& request);
void connect();
private:
void start_read();
void process_write();
asio::io_context context;
asio::io_context::work work;
std::shared_ptr<tcp::socket> socket;
std::shared_ptr<asio::streambuf> buffer;
std::queue<std::string> requests;
const std::string delimeter = "\n";
std::thread io_thread;
tcp::resolver resolver;
tcp::resolver::results_type endpoints;
};
int main()
{
OpenSSL_add_all_algorithms();
ERR_load_crypto_strings();
std::thread gui_thread;
try
{
std::string IP = "IP";
std::string PORT = "PORT";
TCPClient client(IP, PORT);
gui_thread = std::thread([&client] {
if (!std::filesystem::exists("public.pem"))
{
std::string request = "key";
client.push_request(request);
}
std::unique_lock<std::mutex> lock(options.mutex);
options.condition.wait(lock, [] { return std::filesystem::exists("public.pem"); });
login_register_window(client);
if (options.is_logged && options.is_retrieved)
{
main_window(client);
}
else
{
std::cout << "Application should stop\n";
client.~TCPClient();
}
}
);
client.connect();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << '\n';
}
gui_thread.join();
return 0;
}
TCPClient::TCPClient(std::string& IP, std::string& PORT)
: socket(std::make_shared<tcp::socket>(context)),
buffer(std::make_shared<asio::streambuf>()), resolver(context),
endpoints(resolver.resolve(IP, PORT)), work(context)
{ }
TCPClient::~TCPClient()
{
context.stop();
io_thread.join();
}
void TCPClient::connect()
{
asio::async_connect(*socket, endpoints, [this](std::error_code ec, tcp::endpoint) {
if (ec)
{
std::cerr << "Error connecting: " << ec.message() << '\n';
}
});
start_read();
io_thread = std::thread([this]() { context.run(); });
}
void TCPClient::push_request(std::string& request)
{
bool is_queue_empty = false;
{
std::unique_lock<std::mutex> lock(options.mutex);
is_queue_empty = requests.empty();
requests.push(request);
}
options.condition.notify_one();
if (is_queue_empty)
process_write();
}
void TCPClient::process_write()
{
asio::post(context, [this]() {
std::unique_lock<std::mutex> lock(options.mutex);
if (!requests.empty()) {
std::string request = requests.front();
requests.pop();
bool is_queue_empty = requests.empty();
lock.unlock();
std::string request_size = std::to_string(request.size()) + delimeter;
asio::async_write(*socket, asio::buffer(request_size),
[this, request = std::move(request), is_queue_empty = std::move(is_queue_empty)](const std::error_code& ec, std::size_t bytes) {
if (!ec) {
std::cout << "Request size (" << bytes << " bytes) was sent.\n";
asio::async_write(*socket, asio::buffer(request),
[this, is_queue_empty = std::move(is_queue_empty)](const std::error_code& ec, std::size_t bytes) {
if (!ec) {
std::cout << "Request (" << bytes << " bytes) was sent.\n";
if (!is_queue_empty)
process_write(); // Initiate the next write operation
}
else {
std::cerr << "Error sending request: " << ec.message() << '\n';
}
});
}
else {
std::cerr << "Error sending request size: " << ec.message() << '\n';
}
});
}
});
}
void TCPClient::start_read()
{
asio::async_read_until(*socket, *buffer, delimeter,
[this](const std::error_code& ec, std::size_t bytes) {
if (!ec) {
std::istream input_stream(buffer.get());
std::string response_size_str;
std::getline(input_stream, response_size_str);
buffer->consume(bytes);
response_size_str = response_size_str.substr(0, response_size_str.find(delimeter));
int response_size = std::stoi(response_size_str);
asio::async_read(*socket, asio::buffer(buffer->prepare(response_size), response_size),
[this](const std::error_code& ec, std::size_t bytes) {
if (!ec)
{
std::istream input_stream(buffer.get());
std::string response;
std::getline(input_stream, response);
buffer->consume(bytes);
if (options.is_logged)
{
handle_response(response);
}
else
{
std::cout << "Response received: " << response << '\n';
handle_login(response);
}
start_read();
}
else
{
std::cerr << "Error getting response: " << ec.message() << '\n';
}
});
}
else {
std::cerr << "Error getting response size: " << ec.message() << '\n';
}
});
}
I checked the examples provided by boost::asio but it did not work for me, I searched some forums online but that still did not solve my issue
The biggest issue is that TCPClient is destructed at the end of the
tryblock. Nobody waits for anything, and the destructorstops the io context. You could see this with a debugger, or e.g. adding some tracing:Review Remarks
"Never" manually invoke a destructor.
Don't unnecessarily use dynamic allocation (even when using shared or other smart pointers).
Don't pass local variables to async operations by reference (like
request_size, which is a poor name for that variable by the way).When you use
istreamonstreambuf, you MUST NOTconsumethe bytes, because the stream extraction operations do that!streambuf::prepareis only good to use when using async operations that require a single, fixed buffer (liketcp::socket::async_read_some). For composed operations (taking a dynamic buffer) just pass thestreambuf.Asynchronous operations ALWAYS return immediately, so by definition before they have completed. Therefore, this cannot work:
Als the very least, the
start_read()needs to appear inside the completion handler forasync_connect.I do agree that starting the
io_context::runafter posting the first async operation is good. However, since you have aworkguard, it is redundant, and you should probably just start the thread in the constructor for clarity.More Problems (reading on)
You
postthe write loop. But it may block (due to the locks) and also it just launches async operations? Perhaps you meant to use the io thread to ensure serialized access to the members. In that case, why have the mutex as well. Just post thepush_requestitself:Also, don't move the buffers:
This again causes the buffer to be a local variable which is not suited for the
async_write. Instead, leave the message in the queue (std::dequehas reference stability for insertion/removal at either end).Combine the writes:
Now the entire write loop simplifies to
Loose Ends
As mentioned, the
consumeis redundant. However, so is theistream?! Just simplify, let the stream work for you?Don't take constructor arguments by mutable reference needlessly.
It's unclear what the condition is waiting for. Apparently after sending the key request, it waits... for a file to magically appear on the filesystem. This seems like a pretty brittle choice of IPC mechanism, especially since all the synchronizing parts are in this same process, as far as I can see. For now let's just put the magic spell in
do_write_loop:REVIEWED LISTING
Live On Coliru
Live Demo