C++ ASIO deal with coroutine and strands

27 views Asked by At

I'm struggling to use isio::strand<> on coroutine. I need to send request to a server which is mainly asynchronous, for each request I send I would like to wait to receive the ACK before performing the next request.

I have tested the following implementation

#include <asio.hpp>

namespace io = asio;
using tcp = io::ip::tcp;

struct message {
    static constexpr std::size_t HDR_LEN = 32;
    std::uint32_t id;
    enum type {
        REQ, ACK, KEEP_ALIVE
    } type;
    enum code {
        OK, SYNTAX_ERROR, PROTOCOL_ERROR
    } code;
    std::string data;

    std::string pack() const;
    static message unpack( const std::string & );
};

io::awaitable<void> send_message( tcp::socket &socket, const message &msg ) {
    auto buffer = msg.pack();
    co_await io::async_write( socket, io::buffer( buffer ), io::use_awaitable );
}

io::awaitable< message > recv_message( tcp::socket &socket ) {
    std::string buffer( message::HDR_LEN, 0 );
    co_await io::async_read( socket, io::buffer( buffer ), io::use_awaitable );
    auto msg = message::unpack( buffer );
    if( msg.data.size() ){
        co_await io::async_read( socket, io::buffer( msg.data ), io::use_awaitable );
    }
}

io::awaitable< message > send_request( tcp::socket &socket, const message &req ) {
    co_await send_message( socket, req );
    co_await recv_message( socket );
}

int main(int argc, char **argv) {

    io::io_context ctx;
    tcp::socket client( ctx );
    io::strand< io::any_io_executor > strand( ctx );

    io::co_spawn( ctx, [&client]() -> io::awaitable< void > {
        auto ex = co_await io::this_coro::executor;
        
        tcp::acceptor acceptor( ex, { tcp::v4(), 1234 } );
        client = co_await acceptor.async_accept( io::use_awaitable );
        
        io::steady_timer keepalive_timer( ex );
        
        for(;;) {
            keepalive_timer.expires_after( std::chrono::seconds( 5 ) );
            co_await send_request( client, {
                .type = message::type::KEEP_ALIVE,
                .code = message::code::OK,
                .data = {}
            });         
        }

    }, io::detached );


    // Just for the test to see in which order req/ack are sent/received
    for( std::uint32_t i = 0 ; i < 10 ; i++ ) {
        std::thread([&ctx, &strand, &client, i](){
            io::co_spawn( ctx, [&client, &strand, i]() -> io::awaitable<void>{
                co_await io::post( strand, io::use_awaitable ); // How to lock other threads before the full completion of this one ? 
                co_await send_request( client, {
                    .id = i,
                    .type = message::type::REQ,
                    .code = message::code::OK,
                    .data = std::format("Thread n°{}", i)
                });
            }, io::detached );
        });
    }
    
    ctx.run();

    return 0;
}

But the sequence of the completions are the one I expected:

- Send REQ 1
- Send REQ 2
- Send REQ 3
- Send REQ 4
- ...
- Recv ACK 1
- Recv ACK 2
- Recv ACK 3
- Recv ACK 4

What did I need to change to get:

- Send REQ 1
- Recv ACK 1
- Send REQ 2
- Recv ACK 2
- ...
0

There are 0 answers