I have a situation where N threads are working on a data structure simultaneously in small incremental steps. However, sometimes a synchronized action needs to take place.
So all threads need to halt, wait for one of the threads to perform this operation, and continue. I'm looking for a method that has as little as possible impact on the threads when the synchronized action is NOT required.
A simple option is using a shared_mutex, but I think an option with lower overhead is possible. I've attempted my own solution using a barrier and an atomic below.
So my questions are: Is this an effective solution for the problem? Is there a better solution?
#include <vector>
#include <thread>
#include <atomic>
#include <barrier>
int main()
{
const size_t nr_threads = 10;
std::vector<std::thread> threads;
std::barrier barrier { nr_threads };
std::atomic_bool sync_required { false };
auto rare_condition = []() { return std::rand() == 42; };
for (int i = 0; i < nr_threads; ++i)
{
threads.emplace_back([&, i]()
{
while (true)
{
if (sync_required)
{
if (i == 0)
{
barrier.arrive_and_wait();
sync_required = false;
// solo synchronized work
barrier.arrive_and_wait();
}
else
{
barrier.arrive_and_wait();
barrier.arrive_and_wait();
}
}
// standard loop body ...
// sometimes a global synchronized action is required
if (rare_condition()) sync_required = true;
}
});
}
// eventually ... treads quit
for (auto& thread : threads)
{
thread.join();
}
}
Another solution with the shared_mutex needs a condition variable?
#include <array>
#include <atomic>
#include <thread>
#include <vector>
#include <barrier>
#include <shared_mutex>
int main()
{
const size_t nr_threads = 10;
std::vector<std::thread> threads;
std::shared_mutex sync_mtx;
std::atomic_bool sync_required { false };
auto rare_condition = []() { return std::rand() == 42; };
for (int i = 0; i < nr_threads; ++i)
{
threads.emplace_back([&, i]()
{
std::shared_lock shared_lock { sync_mtx };
while (true)
{
// very rarely another thread requires all the others to stop for a bit
if (sync_required)
{
if (i == 0)
{
// unlock shared, but lock unique, seems a little odd but neccesary
shared_lock.unlock();
{
std::unique_lock unique_lock{ shared_lock };
sync_required = false;
// solo sync work
}
shared_lock.lock();
}
else
{
shared_lock.unlock();
// todo: need condition variable, which adds more complexity to this solution?
shared_lock.lock();
}
}
// sometimes a global syncronized action is required
sync_required = sync_required || rare_condition();
}
});
}
// eventually ... treads quit
for (auto& thread : threads)
{
thread.join();
}
}
The major optimization I'll propose to your code is to replace the following
By something like this
If the targeted platform has a wait-free
std::atomic<int>::fetch_sub(hopefully the case with all standard libraries on x64), this code can now choose a thread to runsync()in a wait-free manner, and the chosen thread starts runningsync()immediately. This should be much better than a lock/barrier, if I didn't mess that bit of lock-free code (that's a big IF).My second suggestion is to use
std::counting_semaphoreto let thesync()thread notify the waiting threads that the sync period is over. The use case is well described on cppreference:With the two proposed optimizations, the thread chosen to run
sync()never needs to acquire a lock, or wait on some barrier/condition variable. This is highly desirable: the faster it finishes itssync()section, the faster the whole system can restart.Finally, full code (godbolt):
It is certainly possible to use a weaker memory order than
SEQ_CSTon some of the atomic operations. Reasoning about weaker memory orders is far above my skill, so I'm leaving it this way.