boost::process can't close stdin with pipechain

256 views Asked by At

I have several C-utilities. Each of them reads data from stdin, process data and write the result to stdout. Each utility stops when it's stdin closed. Simplified example of such utility is listed below.

child:

#include <stdio.h>

int main() {
    char buf[256];
    int nread;

    do {
        nread = fread(buf, 1, 256, stdin);

        // Process data...

        fwrite(buf, 1, 256, stdout);
    } while (nread > 0);

    return 0;
}

It allows us to make a chain of these utils connected through pipes. For example, with bash: u1 < data.bin | u2 | u3 | ... | un > result.bin. Once stdin of the first process reaches the end of the file and closes, all processes in the chain cascade close.

And now I need to execute a more complicated chain of these utils from C++. Actually, not only chains but complicated graphs.

But there is a problem with stopping the chain. When I execute one child and close it's stdin, the child's process stops (fread returns 0). But when I execute two or more children, connected with boost::process::pstream or boost::process::pipe (doesn't actually matter), nothing happens when I close stdin. The first child in the chain is still waiting for data from stdin. Simple examples are listed below.

Example 1: one child stops when I close it's stdin - OK

#include <boost/process.hpp>

namespace bp = boost::process;

int main(int argc, char *argv[]) {
    bp::opstream cstdin;
    bp::ipstream cstdout;

    bp::child c0("child", bp::std_in<cstdin, bp::std_out> cstdout);

    cstdin.pipe().close();
    cstdin.close();

    c0.wait();

    return 0;
}

Example 2: two children don't stop when I close the first child's stdin - PROBLEM

#include <boost/process.hpp>

namespace bp = boost::process;

int main(int argc, char *argv[]) {
    bp::opstream cstdin;
    bp::ipstream cstdout;
    bp::pstream connector;

    bp::child c0("child", bp::std_in<cstdin, bp::std_out> connector);
    bp::child c1("child", bp::std_in<connector, bp::std_out> cstdout);

    cstdin.pipe().close();
    cstdin.close();

    c0.wait();
    c1.wait();

    return 0;
}

Running on Debian 10, gcc 8.3, boost 1.80.

How can I fix it?

1

There are 1 answers

0
sehe On

I think you're running into synchronous IO deadlocks here. The streams will buffer, but when the buffer is at capacity they will block.

We can demonstrate this with a single child:

#include <cstdio>
int main() {
    for (char buf[256]; int nread = fread(buf, 1, 256, stdin);) {
        // Process data...
        fwrite(buf, 1, nread, stdout);
    }
}

Now, testing with a simple program

Live On Coliru

#include <boost/process.hpp>
#include <fstream>
#include <iostream>

namespace bp = boost::process;

int main(int argc, char** argv) {
    bp::opstream cstdin;
    bp::ipstream cstdout;

    bp::child c0(bp::exe("./child.exe"), //
                 (bp::std_in < cstdin), (bp::std_out > cstdout));

    for (auto f : std::vector(argv + 1, argv + argc))
        cstdin << std::ifstream(f).rdbuf() << std::flush;
    cstdin.pipe().close();
    cstdin.close();

    std::cout << cstdout.rdbuf();

    c0.wait();
    return c0.exit_code();
}

May complete (as shown) for some small sample files, but on my system, adding a larger file like /etc/dictionaries-common/words makes the thing block:

enter image description here

Now, a simple workaround here would be to make the IO pumps use separate threads:

std::thread pump([=, &cstdin] {
    for (auto f : std::vector(argv + 1, argv + argc))
        cstdin << std::ifstream(f).rdbuf() << std::flush;
    cstdin.pipe().close();
    cstdin.close();
});

std::cout << cstdout.rdbuf();

c0.wait();
pump.join();

And indeed now it works, but this doesn't really scale well.

Enter Async IO

To prevent the need for threads, let alone many of them, as well as the perils of synchronizing access to shared objects, I'd suggest using the async interface of Boost Process:

Live On Coliru

#include <boost/process.hpp>
#include <fstream>
#include <iostream>

using boost::system::error_code;
namespace bp = boost::process;

int main(int argc, char** argv) {
    boost::asio::io_context ioc;
    bp::async_pipe cstdin(ioc);
    bp::async_pipe cstdout(ioc);

    bp::child c0(bp::exe("./child.exe"), (bp::std_in < cstdin),
                 (bp::std_out > cstdout));

    ////////////
    // send loop
    std::function<void(error_code, size_t)> send_loop;

    std::array<char, 256> sbuf;
    auto                  b = argv, e = argv + argc;
    std::ifstream         ifs;

    send_loop = [&](error_code ec, size_t) {
        if (ec.failed() || b == e)
            return;

        if (!ifs.is_open() || !ifs) {
            if (ifs.is_open())
                ifs.close();

            if (++b == e) {
                std::cerr << "Done, closing cstdin" << std::endl;
                cstdin.close();
            } else {
                std::cerr << "Opening " << *b << std::endl;
                ifs.open(*b);
            }
        }
        if (ifs) {
            ifs.read(sbuf.data(), sbuf.size());
            async_write(cstdin, boost::asio::buffer(sbuf, ifs.gcount()),
                        send_loop);
        }
    };

    ////////////
    // read loop
    std::function<void(error_code, size_t)> read_loop;
    std::array<char, 256> rbuf;

    read_loop = [&](error_code ec, size_t n) {
        std::cout.write(rbuf.data(), n);
        if (ec) {
            std::cerr << "ec: " << ec.message() << std::endl;
            cstdout.close();
        } else {
            async_read(cstdout, boost::asio::buffer(rbuf), read_loop);
        }
    };

    //////////////////
    // prime the pumps
    send_loop({}, 0);
    read_loop({}, 0);

    ioc.run(); // can also use `async_wait` or `bp::on_exit`

    return c0.exit_code();
}

As you can see from the output using 3 random Coliru samples, the output is correct:

g++ -o child.exe child.cpp
g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp
./a.out /Archive2/d6/ab*/main.cpp | md5sum
cat /Archive2/d6/ab*/main.cpp | md5sum
Opening /Archive2/d6/ab2444de38af0b/main.cpp
Opening /Archive2/d6/ab51ec5464ad66/main.cpp
Opening /Archive2/d6/ab85e7a6ebb346/main.cpp
Done, closing cstdin
ec: End of file
070344d0db595043f05f745b7bee9ced  -
070344d0db595043f05f745b7bee9ced  -

Now this looks tricky, but it's mainly because of the weird loop surrounding my choice of example/test.

Abstracting

You can package it all up to make it simpler:

using Args   = std::vector<std::string>;
namespace bp = boost::process;

struct Filter {
    Filter(io_context& ioc, Args const& args, Filter& predecessor)
        : _out(ioc)
        , _c(args, (bp::std_out > _out), (bp::std_in < predecessor._out)) {}
    Filter(io_context& ioc, Args const& args)
        : _out(ioc)
        , _c(args, (bp::std_out > _out), bp::std_in.close()) {}

    int exit_code() const { return _c.exit_code(); }

    void consume(auto buf, std::function<void(error_code, size_t)> cb = {}) {
        async_read(_out, buf, [=, this](error_code ec, size_t n) {
            if (cb) cb(ec, n);
            if (!ec) consume(buf, cb); // loop
        });
    }

  private:
    bp::async_pipe _out;
    bp::child      _c;
};

Now you can write a much more involved example with many child processes:

Args generator { "/bin/cat" };
generator.insert(generator.end(), argv + 1, argv + argc);

std::list<Filter> chain;
chain.emplace_back(ioc, generator);
chain.emplace_back(ioc, Args{"/usr/bin/base64"}, chain.back());
chain.emplace_back(ioc, Args{"/usr/bin/xz", "-9"}, chain.back());
chain.emplace_back(ioc, Args{"/usr/bin/unxz"}, chain.back());
chain.emplace_back(ioc, Args{"/usr/bin/base64", "-d"}, chain.back());
chain.emplace_back(ioc, Args{"/usr/bin/md5sum"}, chain.back());

std::string result;
chain.back().consume(boost::asio::dynamic_buffer(result));

ioc.run(); // can also use `async_wait` or `bp::on_exit`

std::cout << "Final output: " << result;

return chain.back().exit_code();

Which still roundtrips as expected: Live On Coliru

./a.out /Archive2/d6/ab*/main.cpp
cat /Archive2/d6/ab*/main.cpp | md5sum
Final output: 070344d0db595043f05f745b7bee9ced  -
070344d0db595043f05f745b7bee9ced  -

Closing Thoughts¹

I'd package that up some more, e.g. into a Chain class, and perhaps using boost::process::group so you get better control of all processes in the chain.

I have some pretty inspirational answers up on this site showing off the async interface of Boost Process some more in case you want to see more: https://stackoverflow.com/search?tab=newest&q=user%3a85371%20process%20async

Examples include downloading the first N million primes asynchronously from a website and uncompressing on the fly, having two UCS chess engines play a game of chess against each-other, automating invocation of ffmpeg using the pipe interface etc.


¹ no pun intended