Win12, boost::asio 1.82.0, MSVC 2022 (I used different compilers, c++ 14 and c++ 20)
I try to understand iocontext step by step. Now I want to use many threads (but I don't want to use thread pool yet, it will be the next step). So, I have the following code
#include <iostream>
#include <random>
#define _WIN32_WINNT 0x0A00
#include <boost/asio.hpp>
#include <sstream>
namespace asio = boost::asio;
static std::vector<std::string> parseLine(std::string input) {
std::vector<std::string> result;
std::istringstream iss(std::move(input));
for (std::string token; getline(iss, token, ';');)
result.push_back(token);
return result;
}
int main() try {
asio::io_context ioc;
auto work = asio::make_work_guard(ioc);
boost::asio::strand<boost::asio::io_context::executor_type> strand(ioc.get_executor());
std::thread thread1([&ioc]() {ioc.run(); });
std::thread thread2([&ioc]() {ioc.run(); });
std::thread thread3([&ioc]() {ioc.run(); });
for (std::string input; getline(std::cin, input);) {
if (input.empty())
break;
for (std::string& token : parseLine(std::move(input))) {
auto handler = [t = std::move(token)] {
std::cout << "ThreadID = " << std::this_thread::get_id() << "; " << t << std::endl;
};
asio::post(ioc, handler);
}
}
work.reset();
thread1.join();
thread2.join();
thread3.join();
}
catch (std::exception const& exc) {
std::cerr << exc.what() << std::endl;
}
catch (...) {
std::cerr << "An unexpected error has happened" << std::endl;
}
And for input like
aaa;bbb;ccc;ddd;eee;fff;ggg
there is output like
ThreadID = 61740; aaaThreadID = 42060; ccc
ThreadID = 42060; ddd
ThreadID = 55384; bbb
ThreadID =
ThreadID = 42060; eee
ThreadID = 61740; ggg
55384; fff
All three threads are involved. But I want to see each message on a separate line. So, I can use mutex like this
#include <iostream>
#include <random>
#define _WIN32_WINNT 0x0A00
#include <boost/asio.hpp>
#include <sstream>
#include <mutex>
std::mutex gMutex;
namespace asio = boost::asio;
static std::vector<std::string> parseLine(std::string input) {
std::vector<std::string> result;
std::istringstream iss(std::move(input));
for (std::string token; getline(iss, token, ';');)
result.push_back(token);
return result;
}
int main() try {
asio::io_context ioc;
auto work = asio::make_work_guard(ioc);
boost::asio::strand<boost::asio::io_context::executor_type> strand(ioc.get_executor());
std::thread thread1([&ioc]() {ioc.run(); });
std::thread thread2([&ioc]() {ioc.run(); });
std::thread thread3([&ioc]() {ioc.run(); });
for (std::string input; getline(std::cin, input);) {
if (input.empty())
break;
for (std::string& token : parseLine(std::move(input))) {
auto handler = [t = std::move(token)] {
const std::lock_guard<std::mutex> lock(gMutex);
std::cout << "ThreadID = " << std::this_thread::get_id() << "; " << t << std::endl;
};
asio::post(ioc, handler);
}
}
work.reset();
thread1.join();
thread2.join();
thread3.join();
}
catch (std::exception const& exc) {
std::cerr << exc.what() << std::endl;
}
catch (...) {
std::cerr << "An unexpected error has happened" << std::endl;
}
And in this case I have output like
ThreadID = 60980; bbb
ThreadID = 36852; aaa
ThreadID = 63436; ccc
ThreadID = 63436; fff
ThreadID = 36852; eee
ThreadID = 60980; ddd
ThreadID = 63436; ggg
It's ok, but I want to use only asio objects. So, I tried to use strand like
#include <iostream>
#include <random>
#define _WIN32_WINNT 0x0A00
#include <boost/asio.hpp>
#include <sstream>
#include <mutex>
namespace asio = boost::asio;
static std::vector<std::string> parseLine(std::string input) {
std::vector<std::string> result;
std::istringstream iss(std::move(input));
for (std::string token; getline(iss, token, ';');)
result.push_back(token);
return result;
}
int main() try {
asio::io_context ioc;
auto work = asio::make_work_guard(ioc);
boost::asio::strand<boost::asio::io_context::executor_type> strand(ioc.get_executor());
std::thread thread1([&ioc]() {ioc.run(); });
std::thread thread2([&ioc]() {ioc.run(); });
std::thread thread3([&ioc]() {ioc.run(); });
for (std::string input; getline(std::cin, input);) {
if (input.empty())
break;
for (std::string& token : parseLine(std::move(input))) {
auto handler = [t = std::move(token)] {
std::cout << "ThreadID = " << std::this_thread::get_id() << "; " << t << std::endl;
};
asio::post(strand, handler);
}
}
work.reset();
thread1.join();
thread2.join();
thread3.join();
}
catch (std::exception const& exc) {
std::cerr << exc.what() << std::endl;
}
catch (...) {
std::cerr << "An unexpected error has happened" << std::endl;
}
But now the output is
ThreadID = 63552; aaa
ThreadID = 50452; bbb
ThreadID = 50452; ccc
ThreadID = 50452; ddd
ThreadID = 50452; eee
ThreadID = 50452; fff
ThreadID = 50452; ggg
And the question is: why does strand use only two threads (not all three)? And why does the first thread is used only one time? Is it possible to understand it from asio documentation?
If using a strand specifically avoid distributing all works uniformly. It causes all work to run sequentially on unspecified threads. In simple cases like this it will often use the same thread as an optimization. (see e.g. boost.asio spawn call handler directly from current stack and `strand::running_in_this_thread()` false positive).
Your version with the
gMutexis pretty much the same as my version withosyncstreamand no strand.To write the same with only Asio synchronization, you'd have to have an "output strand":
Live On Coliru
Printing
HOWEVER
There are problems with that approach:
When we post enough jobs to saturate the pool, the output strand is starved! The second command posts 7+26*26 jobs, and all the output appears at the end:
Basically, at 1374ms finally thread 14 gets the greenlight to run the first output that was posted - only after all jobs - and then thread 14 continuous to invoke all the queued up output handlers, with only the very very last jobs being distributed normally, because the pool wasn't saturated anymore for the last ~15 jobs.
It's clear that the only way this will work as you'd like is with a dedicated thread for the output:
Indeed, now we see fair output, always from the same thread:
Summary
Yes, locking has overhead. Also, it doesn't compose well with asynchronous operations. However, you will agree that in this particular case the overhead of locking is likely (much) lower than full-blown thread switching and task scheduling.
As long as the code under lock is "instant" - never blocking - and there are no interdependencies with other async tasks, mutual exclusion is the natural solution, instead of strands.