Boost async_pipe not showing all child process output

765 views Asked by At

I ran into a roadblock. The code below has issues, but this is just a demo; I want to get the high level logic correct first.

The two startup application output a lot of startup info, before arriving the the "ready" state. At this state, Program A is ready for user input via stdin. Program B just listens via network connection--ingest and record data.

Ideally, with this sample program, I should be able to see the output from Program B, in "real-time". But at each loop iteration, nothing happens; I'm not sure it's receiving input via its pipe.

I was previously using bp::opstream to write to the child's--Program A--stdin. I know if some command are accepted to by Program A via its async_pipe, Progam B show also show some logging info (e.g. "trip"). These are window console applications, and I'm using Boost C++ to interact with them as child processes.

Does anyone have any ideas what's going on?



std::size_t read_loop(bp::async_pipe& p, mutable_buffer buf, boost::system::error_code &err)
{
    return p.read_some(buf, err);


}



void read_loop_async(bp::async_pipe& p, mutable_buffer buf, std::error_code &err) {
    p.async_read_some(buf, [&p, buf, &err](std::error_code ec, size_t n) {
        std::cout << "Received " << n << " bytes (" << ec.message() << "): '";
        std::cout.write(boost::asio::buffer_cast<char const*>(buf), n) << std::endl;
        err = ec;

        if (!ec)
            read_loop_async(p, buf, err);

    });
}


void write_pipe(bp::async_pipe&p, mutable_buffer buf)
{
    ba::async_write(p, buf, [](boost::system::error_code ec, std::size_t sz)
    {
        std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
    });

}


int main()
{

    bp::opstream sendToChild;
    string wd = "<---path-to-working-dir----->";
    ba::io_service ios;
    string bin = "<path-to-bin-and-name>";


    bp::async_pipe input_pipe(ios);
    bp::async_pipe output_pipe(ios);

    bp::child c(bin, "arg1", "arg2", "arg3", bp::std_out > output_pipe,
        bp::std_in < input_pipe, ios, bp::start_dir(wd.c_str()));

    size_t size = 8192;
    string input;

    vector <char> buffer(size);
    boost::system::error_code ec;

    std::error_code err;

    ios.run();
    while (1)
    {
        //show read whatever is available from the childs output_pipe
        read_loop_async(output_pipe, bp::buffer(buffer), err);



        cout << "\nBoot-> ";
        cin >> input;
        if (input == "1")
        {
            cout << "   send input to child: ";
            cin >> input;
            //send commands to the child, Program A
            //originally
            //sendToChild << input<< endl;
            write_pipe(input_pipe, bp::buffer(input));
        }
        if (input == "quit")
        {
            //sendToChild << input << endl;
            read_loop_async(output_pipe, bp::buffer(buffer), err);
            break;
        }

        ios.poll(ec);
        ios.restart();

    }

    c.join();
    cout << "done...";
    cin >> input;
}



Here is the link I followed: How to retrieve program output as soon as it printed?

1

There are 1 answers

9
sehe On

Hmm. There's a lot to unpack. First off:

ios.run();

runs until the child completes. It might well deadlock if the child process needs to send more output than fits in the buffers, sinc ye you don't consume any of it before doing ios.run().

The next poll() by definition does not do anything, because you didn't call restart first. Luckily you ignore the error codes and restart happens next.

Then, you get the next problem, because the next iteration of the loop starts with another read_loop_async(output_pipe, bp::buffer(buffer), err); which means you have overlapping read operations which is usually forbidden (Undefined Behaviour), but runs into UB anyways here because you're using the same buffers.

This in itself is more than enough to explain "lost data" since, yeah, you're doing multiple reads in the same location, so one would clobber the other. That is, if you could reason about it, because you cannot reason about UB.

Wierdly now my eye spots even a third invocation of read_loop_async. It makes no sense. As the name suggests, read_loop_async is already a loop: it calls itself when completed:

    if (!ec)
        read_loop_async(p, buf, err);

So, only 1 invocation would ever be expected. It seems like you don't grasp that async_* initiation functions always return immediately (because the operation completes asynchronously). This is also exemplified in the fact that you assign:

    err = ec;

Where err is a reference argument to the initiation function. It doesn't work like that. The error is only available on completion. Since you don't seem to use it outside the read loop anyways, I'll drop it.

Then there's

        sendToChild << input << std::endl;

Which does absolutely nothing, since sendToChild is literally only declared, and never used elsewhere.

write_pipe again tries to use an async_ initiation, but it cannot, because it's being used in a synchronous input loop. Just don't use async there. As written it was another source of UB, because the buf argument would point to a std::string variable that was being mutated in the main function. So, simplify:

void write_pipe(bp::async_pipe& p, const_buffer buf) {
    error_code ec;
    auto       sz = write(p, buf, ec);
    std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
}

[Note how it correctly marks buf as const_buffer.]

Now, probably fix that sendToChild use by

  • also closing the pipe (signaling EOF to the child)
  • breaking the input loop
    if (input == "quit") {
        write_pipe(input_pipe, bp::buffer(input + "\n"));
        input_pipe.close();
        break;
    }

I'll replace the ios.restart() stuff with just poll() - because we didn't run() it too early anyways.

Other than the above, I replaced the operator>> with std::getline calls because it's most likely you want the user input to be delimited with Enter keys, not space. I also added "\n" as you had in the sendToChild line, because it helps demonstrating with a simple test child that uses line-buffered input.

Now, we'll use this as a test child:

bp::child c(bin, "-c",
            "time while read line; do echo \"$line\" | rev | xxd; done", //
            bp::std_out > output_pipe,
            bp::std_in < input_pipe, //
            ios,                     //
            bp::start_dir(wd));

Which means we get our input echoed in reverse and hex dump, and a time summary at the end.

Somewhat Fixed Listing

Live On Coliru

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
namespace bp = boost::process;
using boost::asio::const_buffer;
using boost::asio::mutable_buffer;
using boost::system::error_code;

void read_loop_async(bp::async_pipe& p, mutable_buffer buf) {
    p.async_read_some(buf, [&p, buf](std::error_code ec, size_t n) {
        std::cout << "Received " << n << " bytes (" << ec.message() << "): '";
        std::cout.write(boost::asio::buffer_cast<char const*>(buf), n) << std::endl;

        if (!ec)
            read_loop_async(p, buf);
    });
}

void write_pipe(bp::async_pipe& p, const_buffer buf) {
    error_code ec;
    auto       sz = write(p, buf, ec);
    std::cout << "Size Written " << sz << " Ec: " << ec << " " << ec.message() << '\n';
}

int main() {
    std::string wd = "/home/sehe/Projects/stackoverflow";
    boost::asio::io_service ios;
    std::string bin = "/bin/bash";


    bp::async_pipe input_pipe(ios);
    bp::async_pipe output_pipe(ios);

    bp::child c(bin, "-c",
                "while read line; do echo \"$line\" | rev | xxd; done", //
                bp::std_out > output_pipe,
                bp::std_in < input_pipe, //
                ios,                     //
                bp::start_dir(wd));


    // Single invocation!
    std::vector<char> buffer(8192);
    read_loop_async(output_pipe, bp::buffer(buffer));

    std::cout << "\nBoot-> ";
    for (std::string input; getline(std::cin, input);
         std::cout << "\nBoot-> ") {
        if (input == "1") {
            std::cout << "   send input to child: ";
            if (getline(std::cin, input)) {
                write_pipe(input_pipe, bp::buffer(input + "\n"));
            }
        }
        if (input == "quit") {
            write_pipe(input_pipe, bp::buffer(input + "\n"));
            input_pipe.close();
            break;
        }

        ios.poll();
    }

    ios.run(); // effectively like `c.wait();` but async

    std::cout << "done...";
    // ignore until line end
    std::cin.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
}

Tested with

g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp -lboost_{system,filesystem} && ./a.out <<HERE
1
Hello world
Ignored
1
Bye world
quit
HERE

Prints

Boot->    send input to child: Size Written 12 Ec: system:0 Success

Boot-> 
Boot->    send input to child: Size Written 10 Ec: system:0 Success

Boot-> Size Written 5 Ec: system:0 Success
Received 64 bytes (Success): '00000000: 646c 726f 7720 6f6c 6c65 480a            dlrow olleH.

Received 62 bytes (Success): '00000000: 646c 726f 7720 6579 420a                 dlrow eyB.

Received 57 bytes (Success): '00000000: 7469 7571 0a                             tiuq.

Received 0 bytes (End of file): '
done...

Which is easier to follow interactively on my system:

enter image description here