boost asio tcp connection no received data after reconnecting and reading

49 views Asked by At

I am using boost asio to connect to a TCP Server. When I run the code it works fine after start. I send the request and get the response. When I turn off the tcp server (it is a device) I am running into timeout and callback inside async_read from boost will never be executed when I am running into timeout. Then I close the socket. After turn on the device the connection could be re-established but then the recived buffer size is 0 bytes. I think thats because the async_read was not finished correctly after timeout.

header

#include <iostream>
#include <boost/format.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <chrono>
#include <thread>
#include <future>
#include <sstream>
#include <iomanip>


class TcpClient{
    public:
        int connect(boost::asio::ip::tcp::socket &socket, boost::asio::ip::tcp::endpoint &endpoint);
        int writeAndRead(boost::asio::ip::tcp::socket &socket);

};

This is the code

#include "tcpclient.h"

int TcpClient::connect(boost::asio::ip::tcp::socket &socket, boost::asio::ip::tcp::endpoint &endpoint)
{
    boost::system::error_code error;
    socket.connect(endpoint, error);
    if (!error)
    {
        std::cout << "connected" << std::endl;
        return 1;
    }
    else
    {
        std::cout << "not connected" << std::endl;
        return 0;
    }
}

int TcpClient::writeAndRead(boost::asio::ip::tcp::socket &socket)
{

    boost::system::error_code error;
    auto status = std::async(std::launch::async, [&]()
                             { boost::asio::write(socket, boost::asio::buffer("mytext"), error); })
                      .wait_for(std::chrono::milliseconds{1000});

    switch (status)
    {
    case std::future_status::deferred:
        std::cout << "std::future_status::deferred" << std::endl;
        return 0;
    case std::future_status::ready:
        std::cout << "write success" << std::endl;
        break;
    case std::future_status::timeout:
        std::cout << "std::future_status::timeout" << std::endl;
        return 0;
    }
    boost::asio::streambuf receive_buffer;
    boost::optional<boost::system::error_code> read_result;
    boost::optional<boost::system::error_code> timer_result;
    boost::asio::deadline_timer timer(socket.get_io_service());

    timer.expires_from_now(boost::posix_time::seconds(2));

    timer.async_wait([&timer_result](const boost::system::error_code &error)
                     {
        if (error != boost::asio::error::operation_aborted)
        {
            timer_result = error;
        } });

    boost::asio::async_read(socket,
                            receive_buffer,
                            boost::asio::transfer_at_least(1),
                            [&read_result](const boost::system::error_code &ec, std::size_t bytes_transferred)
                            {
                                std::cout << "read_result: " << read_result << std::endl;
                                read_result = ec;
                            });
    boost::system::error_code ec;

    while (1)
    {
        socket.get_io_service().reset();
        int numHandlers = socket.get_io_service().poll_one(ec);
        if (read_result)
        {
            timer.cancel();
            break;
        }
        else if (timer_result)
        {
            timer.cancel();
            std::cout << "timeout" << std::endl;
            return 0;
        }
    }
    if (receive_buffer.size() == 0)
    {
        std::cout << "receive_buffer size 0" << std::endl;
        return 0;
    }
    std::string rawResponse = boost::asio::buffer_cast<const char *>(receive_buffer.data());
    std::cout << "rawResponse: " << rawResponse << std::endl;
    return 1;
}

int main()
{
    std::string ipAddress = "192.168.2.4";
    unsigned short port = 50001;
    boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(ipAddress), port);
    boost::asio::io_service ioService;
    boost::asio::ip::tcp::socket socket{ioService};
    bool isSuccess{false};
    bool isConnected{false};
    TcpClient tcpclient = TcpClient();

    while (1)
    {
        if (!isConnected)
        {
            isConnected = tcpclient.connect(socket, endpoint);
        }
        if (isConnected)
        {
            isSuccess = tcpclient.writeAndRead(socket);
            if (!isSuccess)
            {
                std::cout << "failed close socket" << std::endl;
                socket.close();
                isConnected = false;
            }
            else
            {
                std::cout << "success" << std::endl;
            }
        }
        std::cout << "wait for 1 sec" << std::endl;
        std::chrono::seconds dura(1);
        std::this_thread::sleep_for(dura);
    }

    return 0;
}

this is the output

success
wait for 1 sec
write success
read_result: 0
rawResponse: 0;

success
wait for 1 sec
write success
timeout
failed close socket
wait for 1 sec
not connected
wait for 1 sec
not connected
wait for 1 sec
not connected
wait for 1 sec
not connected
wait for 1 sec
connected
write success
read_result: 0
receive_buffer size 0
failed close socket
wait for 1 sec
connected
write success
read_result: 0
receive_buffer size 0
failed close socket
1

There are 1 answers

6
sehe On

Asio is an asynchronous IO library.

You're using the blocking API with std::async to ... fake asynchronous IO. I'd suggest not doing that. It looks like your code is based on (very) old example code. get_io_service() is long gone and io_service itself is only available as part of deprecated API.

I don't know what compiler you have access to, but this is how I'd write the same code now:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
using Socket = asio::use_awaitable_t<>::as_default_on_t<tcp::socket>;
using namespace std::chrono_literals;
using namespace asio::experimental::awaitable_operators;

asio::awaitable<void> delay(std::chrono::steady_clock::duration dur) {
    auto ex = co_await asio::this_coro::executor;
    co_await asio::steady_timer(ex, dur).async_wait(asio::deferred);
}

asio::awaitable<void> requestLoop(Socket& socket) {
    for (std::string request = "test", response;;) {
        if (auto r = co_await (async_write(socket, asio::buffer(request)) || delay(1s)); r.index()) {
            std::cout << "write timeout" << std::endl;
            break;
        }

        auto r = co_await ( //
            asio::async_read(socket, asio::dynamic_buffer(response), asio::transfer_at_least(1),
                             as_tuple(asio::use_awaitable)) ||
            delay(2s));
        if (r.index()) {
            std::cout << "read timeout" << std::endl;
            break;
        }

        auto [ec, bytes] = get<0>(r);

        std::cout << "Response: " << quoted(response) << " (" << ec.message() << ")" << std::endl;
    }
}

asio::awaitable<void> client(std::string ipAddress, uint16_t port) {
    auto ex = co_await asio::this_coro::executor;
    for (;; co_await delay(1s)) {
        try {
            Socket socket(ex);
            co_await socket.async_connect({asio::ip::address::from_string(ipAddress), port});
            std::cout << "Connected to " << socket.remote_endpoint() << std::endl;

            co_await requestLoop(socket);
        } catch (boost::system::system_error const& se) {
            std::cout << "Error: " << se.code().message() << std::endl;
        }
    }
}

int main() {
    asio::io_context ioc;
    //co_spawn(ioc, client("192.168.1.1", 8045), asio::detached);
    co_spawn(ioc, client("0.0.0.0", 8045), asio::detached);
    ioc.run();
}

With a local demo:

enter image description here

UPDATE: C++11, Boost 1.53, Single Threading

In response to the added context in the comment, here an time-traveling version that ... well goes back in time :)

Live On Compiler Explorer

#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <iostream>
namespace asio   = boost::asio;
namespace chrono = std::chrono;
using asio::ip::tcp;
using std::placeholders::_1;

struct TcpClient {
    TcpClient(std::string ipAddress, uint16_t port) //
        : ep_{asio::ip::address::from_string(ipAddress), port} {}

    void run() {
        do_connect();
        ios_.run();
    }

  private:
    using error_code = boost::system::error_code;
    asio::io_service   ios_;
    tcp::endpoint      ep_;
    tcp::socket        socket_{ios_};
    asio::steady_timer timer_{ios_};
    std::string        request_ = "hello";
    asio::streambuf    buffer_;

    void start_reconnect() {
        if (socket_.is_open()) {
            error_code ignore;
            socket_.close(ignore);
        }
        timer_.expires_from_now(chrono::seconds(1));
        timer_.async_wait(bind(&TcpClient::do_connect, this, _1));
    }

    void do_connect(error_code ec = {}) {
        if (!ec)
            socket_.async_connect(ep_, bind(&TcpClient::on_connect, this, _1));
    }

    void on_connect(error_code ec) {
        if (ec) {
            std::cout << "Error: " << ec.message() << std::endl;
            return start_reconnect();
        }
        do_request_loop();
    }

    void do_request_loop() {
        timer_.expires_from_now(chrono::seconds(1));
        timer_.async_wait(bind(&TcpClient::on_timeout, this, _1));

        async_write(socket_, asio::buffer(request_), bind(&TcpClient::on_request_sent, this, _1));
    }

    void on_request_sent(error_code ec) {
        if (ec) {
            std::cout << "Error: " << ec.message() << std::endl;
            return start_reconnect();
        }

        timer_.expires_from_now(chrono::seconds(2));
        timer_.async_wait(bind(&TcpClient::on_timeout, this, _1));

        async_read(socket_, buffer_, asio::transfer_at_least(1),
                   bind(&TcpClient::on_response_received, this, _1));
    }

    void on_response_received(error_code ec) {
        if (ec) {
            std::cout << "Error: " << ec.message() << std::endl;
            return start_reconnect();
        }
        std::string response(buffers_begin(buffer_.data()), buffers_end(buffer_.data()));
        std::cout << "Response: \"" << response << "\" (" << ec.message() << ")" << std::endl;
        do_request_loop();
    }

    void on_timeout(error_code ec) {
        if (!ec) {
            std::cout << "Timed out" << std::endl;
            socket_.cancel();
        }
    }
};

int main() {
    TcpClient client("0.0.0.0", 8045);
    client.run();
}

With the equivalent demo:

enter image description here

BONUS: Faux Coroutines

Just remembered that with the magic of "faux coroutines" you can write the c++11/boost 1.53 version hiding the callbacks and simulating normal control flow:

Live On Compiler Explorer

#include <boost/asio.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/steady_timer.hpp>
#include <iostream>
namespace asio = boost::asio;

void run_client(asio::io_service& ios, std::string ipAddress, uint16_t port) {
    using asio::ip::tcp;
    using boost::system::error_code;
    using std::chrono::seconds;

    struct Impl {
        Impl(asio::io_service& ios, tcp::endpoint ep) : st_(std::make_shared<State>(ios, ep)) {}

        struct State : std::enable_shared_from_this<State> {
            State(asio::io_service& ios, tcp::endpoint ep) : s(ios), ep_(ep), timer_(ios) {}

            asio::coroutine    coro;
            tcp::socket        s;
            tcp::endpoint      ep_;
            asio::steady_timer timer_;
            std::string        request_ = "hello";
            asio::streambuf    buffer_;

            void deadline(asio::steady_timer::duration d) {
                timer_.expires_from_now(d);
                timer_.async_wait(bind(&State::on_timer, shared_from_this(), std::placeholders::_1));
            }

          private:
            void on_timer(error_code ec) {
                if (!ec) {
                    std::cout << "Timed out" << std::endl;
                    s.cancel();
                }
            }
        };
        std::shared_ptr<State> st_ ;

#include <boost/asio/yield.hpp>
        void operator()(error_code ec = {}, size_t /*bytes_transferred*/ = {}) {
            reenter(st_->coro) {
                for (;;) {
                    yield st_->s.async_connect(st_->ep_, std::move(*this));

                    std::cout << "Connection to " << st_->ep_ << " " << ec.message() << std::endl;

                    while (!ec) {
                        st_->deadline(seconds(1));
                        yield async_write(st_->s, asio::buffer(st_->request_), std::move(*this));

                        std::cout << "Written: " << ec.message() << std::endl;
                        if (ec) break;

                        st_->deadline(seconds(2));
                        yield async_read(st_->s, st_->buffer_, asio::transfer_at_least(1), std::move(*this));

                        std::cout << "Read: " << ec.message() << std::endl;
                        if (ec) break;

                        std::string response(buffers_begin(st_->buffer_.data()),
                                             buffers_end(st_->buffer_.data()));
                        std::cout << "Response: \"" << response << "\" (" << ec.message() << ")" << std::endl;
                    }

                    // close the connection
                    st_->s.close(ec);

                    // wait 1s before reconnecting
                    st_->timer_.expires_from_now(seconds(1));
                    yield st_->timer_.async_wait(std::move(*this));
                }
            }
        }
#include <boost/asio/unyield.hpp>
    };

    ios.post(Impl(ios, {asio::ip::address::from_string(ipAddress), port}));
}

int main() {
    asio::io_service ios(1);
    run_client(ios, "0.0.0.0", 8045);
    ios.run();
}

With yet another equivalent demo:

enter image description here