asio TCP Client breaks when async

49 views Asked by At

I am currently trying to communicate from Unreal Engine 5 with an existing piece of software we have running at our company. This software accepts TCP connections.

Since it seems to be the de-facto standard for C++, I wanted to us asio for this (standalone, without boost). Following some online tutorials and posts I managed to create a working client that connects synchronously and then waits asynchronously for messages. This works as long as the server application is already running when I start the client.

//IOSocket.h (excluding function declarations)
static constexpr int BUFFER_SIZE = 4096;
static constexpr int MAX_CONNECTION_ATTEMPTS = 3;

std::array<char, BUFFER_SIZE> _buffer;
asio::io_context _context;
asio::ip::tcp::endpoint _endpoint;
asio::ip::tcp::socket _socket;
std::queue<std::string> _outMessageQueue;

//IOSocket.cpp
IOSocket::IOSocket(const std::string& hostname, const uint16_t& port) : _buffer(), _socket(_context) {

    asio::ip::address ip;
    if (hostname == "localhost" || hostname == "loopback") {
        ip = asio::ip::address_v4::loopback();
    } else {
        ip = asio::ip::address::from_string(hostname);
    }

    _endpoint = asio::ip::tcp::endpoint(ip, port);

    asio::io_context::work idle(_context);
    _contextThread = std::thread([this]() {
        _context.run();
    });

    Connect();
}

IOSocket::~IOSocket() {
    Disconnect(false);

    _context.stop();
    if (_contextThread.joinable()) {
        _contextThread.join();
    }
}

void IOSocket::Connect() {
    Disconnect();

    //open TCP socket
    asio::error_code error;
    _socket.connect(_endpoint, error);

    if (error) {
        Disconnect();
        return;
    }

    for (; !_outMessageQueue.empty(); _outMessageQueue.pop()) {
        SendMessage(_outMessageQueue.front());
    }

    //start receive
    ReceiveData();
}

void IOSocket::Disconnect() {
    if (_socket.is_open()) {
        asio::error_code error;
        _socket.shutdown(asio::socket_base::shutdown_both, error);
        _socket.close(error);
    }
}

void IOSocket::ReceiveData() {
    _socket.async_read_some(asio::buffer(_buffer), [&](asio::error_code error, std::size_t receviedBytes) {
        if (!error) {
            const std::vector<std::string> msgs = Utilities::String::Split(_buffer.data(), '*');

            for (const std::string& msg : msgs) {
                //Message gets handled by the owner of the IOSocket
            }
            ReceiveData();
        }
    });
}

void IOSocket::SendMessage(const std::string& message) {
    if (_socket.is_open()) {
        write(_socket, asio::buffer(message));
    } else {
        _outMessageQueue.push(message);
    }
}

However, I want the client to try connecting at the start for a little while (3 tries with a ten second interval) and when the client already had established a connection and the server disappears, I would like for the client to indefinitely try reconnecting until the server is back again. So I modified the Connect and Disconnect functions to reconnect using a async helper function:

//IOSocket.h additions
std::thread _contextThread;
std::future<void> _waitForReconnect;

//IOSocket.cpp
void IOSocket::Connect(int remainingAttempts) {
    Disconnect(false);

    //open TCP socket
    asio::error_code error;
    _socket.connect(_endpoint, error);

    if (error) {
        remainingAttempts--;
        if (remainingAttempts >= 0) {
            int currentAttempt = MAX_CONNECTION_ATTEMPTS - remainingAttempts;
            Debug::LogWarning(std::format("Connection attempt {0}/{1} failed: {2}", currentAttempt, MAX_CONNECTION_ATTEMPTS, error.message()));
            Disconnect(remainingAttempts > 0, remainingAttempts);
        } else {
            Debug::LogWarning(std::format("Connection attempt failed: {0}", error.message()));
            Disconnect(true);
        }

        return;
    }

    Debug::Log(std::format("Connected to Solid at {0}:{1}", _endpoint.address().to_string(), _endpoint.port()));

    for (; !_outMessageQueue.empty(); _outMessageQueue.pop()) {
        SendMessage(_outMessageQueue.front());
    }

    //start receive
    ReceiveData();
}

void IOSocket::Disconnect(bool tryReconnect, int remainingAttempts) {
    if (_socket.is_open()) {
        asio::error_code error;
        _socket.shutdown(asio::socket_base::shutdown_both, error);
        _socket.close(error);
    }

    if (tryReconnect) {
        _waitForReconnect = Utilities::Delay(10000, [this](int n) { Connect(n); }, remainingAttempts);
    }
}

//Utilities.h
template<typename Fn, typename... Args>
static auto Delay(uint32_t delayMilliseconds, Fn&& fn, Args&&... args) {
    return std::async(std::launch::async, [=](){
        if (delayMilliseconds > 0) {
            std::this_thread::sleep_for(std::chrono::milliseconds(delayMilliseconds));
        }
        fn(args...);
    });
}

This nearly works. I get a connection and I can send messages, but I don't receive anything. If I debug and set a breakpoint in any delayed function, the io_context is "stopped" and "outstanding_work" is 0. Even if I restart it and give it new "fake" work to do, I still don't get it process incomming messages. If I make the innitial connection attempt synchronously (and the server is running), this still produces a functioning connection, but reconnects don't, and if I make the innitial attempt asynchronously, it shows the same behaviour as the reconnects. There is no difference between calling it through my Delay function, an explicit std::thread or a manual std::async call.

I am quite new to asio and I haven't written C++ code in nearly a decade (I am usually using C#), so I am sure there is a lot I am doing wrong, but I just can't figure out what it is.

1

There are 1 answers

5
Super-intelligent Shade On

From the code you've posted, you seem to be over-complicating things a bit. Here is a simple client that attempts to connect to a server 3 times every 10 seconds, and once it loses connection it will re-attempt indefinitely. Notice there are no threads involved here -- only async operations:

#include <asio.hpp>
#include <chrono>
#include <iostream>

using asio::ip::tcp;
using namespace std::chrono_literals;

struct client
{
    explicit client(asio::io_context& ctx) :
        socket{ctx}, endpoint{asio::ip::address_v4::loopback(), 8080}, timer{ctx}
    { connect_in(0s); }

private:
    tcp::socket socket;
    tcp::endpoint endpoint;
    asio::steady_timer timer;

    std::string data;

    int ttl = 3;
    void connect_in(std::chrono::seconds s)
    {
        if (ttl)
        {
            timer.expires_after(s);
            timer.async_wait([&](asio::error_code ec)
            {
                std::cout << "connect... " << std::flush;

                if (!ec) socket.connect(endpoint, ec);
                if (!ec)
                {
                    std::cout << "OK" << std::endl;
                    ttl = -1;
                    recv_data();
                }
                else
                {
                    std::cout << "FAILED" << std::endl;
                    if (ttl > 0) --ttl; // maybe negative
                    connect_in(10s);
                }
            });
        }
        else std::cout << "ABANDON" << std::endl;
    }

    void recv_data()
    {
        asio::async_read_until(socket, asio::dynamic_buffer(data), '*', [&](asio::error_code ec, std::size_t n)
        {
            if (!ec)
            {
                auto line = data.substr(0, n);
                line.pop_back();
                data.erase(0, n);

                std::cout << line << std::endl;
                recv_data();
            }
            else
            {
                std::cout << "disconnect" << std::endl;
                socket.close();
                connect_in(10s);
            }
        });
    }
};

int main()
{
    asio::io_context ctx;
    client client{ctx};
    ctx.run();
}

Since I don't have an access to an UE server, here is a simple server that listens for connections and send "ping" messages every few seconds:

#include <asio.hpp>
#include <chrono>
#include <iostream>

using asio::ip::tcp;
using namespace std::chrono_literals;

struct session
{
    explicit session(tcp::socket s) :
        socket{std::move(s)}, timer{socket.get_executor()}
    {
        std::cout << this << ": create" << std::endl;
        send();
    }

    ~session() { std::cout << this << ": delete" << std::endl; }

private:
    tcp::socket socket;
    asio::steady_timer timer;

    void send()
    {
        timer.expires_after(2s);
        timer.async_wait([&](asio::error_code ec)
        {
            if (!ec)
            {
                std::cout << this << ": ping" << std::endl;
                socket.send(asio::buffer("ping*"), { }, ec);
                if (!ec)
                {
                    send();
                    return;
                }
            }

            delete this;
        });
    }
};

struct server
{
    server(asio::io_context& ctx) :
        acceptor{ctx, tcp::endpoint{tcp::v4(), 8080}},
        socket{ctx}
    { accept(); }

private:
    tcp::acceptor acceptor;
    tcp::socket socket;

    void accept()
    {
        acceptor.async_accept(socket, [&](asio::error_code ec)
        {
            if (!ec) new session{std::move(socket)};
            accept();
        });
    }
};

int main()
{
    asio::io_context ctx;
    server server{ctx};
    ctx.run();
};

Link to both: https://godbolt.org/z/q9TjnP6Ws