I've tried to modify the example from here and make the processes run in parallel since in my use case, the processes can be idle for a large portion of their lifecycle, so cpu resource can be better utilize when running in parallel
- First I've tried that each task will run from boost::thread_group object. While running it, I got sometimes unexpected behavior that led to crash for the reasons below :
a.out(17512,0x1dac25c40) malloc: *** error for object 0x600003d60000: pointer being freed was not allocated
a.out(17512,0x1dac25c40) malloc: *** set a breakpoint in malloc_error_break to debug
libc++abi: terminating due to uncaught exception of type std::__1::future_error: The associated promise has been destructed prior to the associated state becoming ready.```
- Then I've tried to replace the thread_group and run everything from the io_context using
boost::asio::post(ioc, [&]() {but it didn't go well either since i got deadlock where the std::future is blocking the thread, and prevent the process from running. I guess it's possible to add more threads to run io_context, but i prefer to have some std::future that can yield.
I'd be happy to hear suggestions to repair my code:
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <iostream>
#include <boost/thread.hpp>
using duration = std::chrono::system_clock::duration;
namespace asio = boost::asio;
using namespace std::chrono_literals;
std::string ExecuteProcess(boost::filesystem::path exe,
std::vector<std::string> args, //
duration time, //
std::error_code& ec, //
asio::io_context& ioc) {
namespace bp = boost::process;
std::future<std::string> data, err_output;
auto const deadline = std::chrono::steady_clock::now() + time;
bp::group g;
ec.clear();
bp::child child(exe, args, ioc, g, bp::error(ec), bp::std_in.null(), bp::std_out > data,
bp::std_err > err_output);
if (ec) {
return {};
}
if (data.wait_until(deadline) == std::future_status::ready) {
return data.get();
}
if (std::error_code ignore; child.running(ignore)) {
g.terminate(ignore);
}
ec = make_error_code(asio::error::timed_out); // TODO FIXME
return {};
}
int main() {
constexpr duration timeout = 20s;
[[maybe_unused]] constexpr auto script1 = "/usr/bin/curl http://httpbin.org/ip -m 5";
[[maybe_unused]] constexpr auto script2 = R"(delay="0.5"; sleep "$delay"; echo -n "sanity restored after $delay")";
asio::io_context ioc;
auto work = make_work_guard(ioc); // prevent running out of work
std::thread io_thread([&ioc] { ioc.run(); });
// Option 1 : use thread group
// boost::thread_group worker_threads;
// for (int i = 0; i < 20; i++) {
// worker_threads.create_thread([&]() {
// std::error_code ec;
// auto s = ExecuteProcess("/bin/bash", {"-c", script2}, timeout, ec, ioc);
// std::cout << "got " << ec.message() << ": " << s << std::endl;
// });
// }
//
// work.reset(); // allow running out of work
// io_thread.join();
// worker_threads.join_all();
// Option 2 : use post-io_context
for (int i = 0; i < 20; i++) {
boost::asio::post(ioc, [&]() {
std::error_code ec;
auto s = ExecuteProcess("/bin/bash", {"-c", script2}, timeout, ec, ioc);
std::cout << "got " << ec.message() << ": " << s << std::endl;
});
}
work.reset(); // allow running out of work
io_thread.join();
}
the code can be compiled for testing using the following command :
g++ -std=c++20 -g -O3 -Wall -pedantic -pthread -lboost_{thread,coroutine,context} ~/main.cpp -I<path_to_boost_headers> -L<path_to_boost_libs>
Yeah. It's a bit annoying that
boost::process::childdoesn't expose a regular async interface compatible with completion tokens, despite the fact that it tightly integrates with Asio and naturally deals with 100% asynchronous processes.This interface does exist: https://beta.boost.org/doc/libs/1_82_0/doc/html/boost/process/async_system.html. However, it cannot work with c++14-only completion tokens. Also it has the very old anti-pattern of passing a reference to
asio::io_contextaround.This makes it hard to intelligently use it e.g. with a strand or even with
asio::thread_pool. However, we can roll our own initiation function that gets around those downsides. Sadly, we will always need anio_contextinstance, though you might choose to use an internal ("global") instance hidden from the interface.With some convenience typedefs
Let's define our completion token interface as
Before we dive in deep, this is how we expect to use it to run many processes in parallel under some time constraints and handle each result as soon as it is ready:
The embedded bash script sleeps randomly between 1..10 * 10ms, and returns the corresponding exit code 1..10. Since we start them all "immediately", we expect the results to come in ascending order of delay, but not necessarily in order of job number (#i).
Since all delays > 50ms should timeout, we expect the few last entries to show errors.
The Magic
In the implementation details we need to deduce an associated executor (in case the completion token is e.g.
bind_executor(my_strand, asio::use_awaitable)).Next up, some resources need to be allocated so they remain stable for the duration of our async operation:
Stable state contains all the things that cannot be (cheaply) moved:
The remaining bits of the initiation function may look like this:
Most of the code otherwise is making sure that the right executor is used¹, the allocations are freed at the required moments and doing various asserts to guard against programming mistakes.
DEMO TIME
Of course, where would we be without proof of the pudding:
Live On Coliru
¹ there's a subtle race still here, where even if
terminated==falsewe still have to check that theoutfuture isready. If we don't, we can get very unlucky and have a race where the future is never ready but the process has been killed. I suspect this is indicating that Boost Process actually doesn't perform all the work on an service thread (as required). I'll probably look into this later.