I have understanding issues on the memory-model. Here is the example (I tried to simplify it but it's still a bit long) I will base my question upon:
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <optional>
#include <random>
#include <thread>
constexpr size_t hardware_destructive_interference_size = 128;
constexpr int maxWait = 1000;
constexpr int experimentRuns = 100;
constexpr int experimentLength = 1'000;
static void waitRandom() {
if (maxWait == 0) return;
thread_local std::mt19937 gen{std::random_device{}()};
std::uniform_int_distribution dist(0, maxWait);
auto waitIterations = dist(gen);
for (int i{0}; i < waitIterations; ++i) {
[[maybe_unused]] volatile int doNotOptimize = 0;
}
}
template <typename INPUT>
class Worker {
using input_t = INPUT;
using func_t = void (*)(input_t);
public:
Worker(func_t fun) {
mThread = std::thread([this, fun]() mutable {
while (!stop()) {
mainLoop(fun);
}
});
}
~Worker() {
{
std::lock_guard lk{mMutex};
mStop.store(true, std::memory_order_relaxed);
}
mCV.notify_one();
mThread.join();
}
bool ready() { return !mWorkerBusy.load(std::memory_order_acquire); }
template <typename T>
void submitJob(T&& input) {
assert(ready());
{
std::lock_guard lk{mMutex};
mInput.slot = std::forward<T>(input);
}
mWorkerBusy.store(true, std::memory_order_relaxed);
mCV.notify_one();
}
private:
void mainLoop(func_t& fun) {
waitForJob();
if (stop()) {
return;
}
fun(input());
completeJob();
}
void waitForJob() {
assert(!inputValid());
mWorkerBusy.store(false, std::memory_order_release);
{
std::unique_lock lk{mMutex};
mCV.wait(lk, [&] { return inputValid() || stop(); });
}
}
void completeJob() { mInput.slot.reset(); }
bool stop() const { return mStop.load(std::memory_order_relaxed); }
bool inputValid() const { return mInput.slot.has_value(); }
input_t& input() {
assert(inputValid());
return *mInput.slot;
}
private:
std::thread mThread;
std::mutex mMutex;
std::condition_variable mCV;
struct {
alignas(
hardware_destructive_interference_size) std::optional<input_t> slot;
} mInput;
std::atomic_bool mWorkerBusy = true;
std::atomic_bool mStop = false;
};
static void asyncProcess(int) { waitRandom(); }
static auto now() { return std::chrono::high_resolution_clock::now(); }
static void runExperiment() {
Worker w{asyncProcess};
const auto start = now();
int input{0};
for (int j{0}; j < experimentLength; ++j) {
do {
++input;
} while (!w.ready());
w.submitJob(input);
}
const auto duration = now() - start;
const auto durationNs =
std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
const auto usPerSample = static_cast<double>(durationNs) / input;
std::cout << usPerSample << "us/Sample" << std::endl;
}
int main() {
using namespace std::literals::chrono_literals;
for (int i{0}; i < experimentRuns; ++i) {
runExperiment();
std::cerr << i + 1 << " experiment(s) done\n\n";
}
}
LIVE
Basically, the main thread produces data and a worker thread consumes them but they may not be at the same pace (producing is faster than consuming).
The "invariants" are:
- worker must finish its current task before starting a new one
- data must be available
One of my misunderstanding lies here:
void waitForJob() {
assert(!inputValid());
mWorkerBusy.store(false, std::memory_order_release);
{
std::unique_lock lk{mMutex};
mCV.wait(lk, [&] { return inputValid() || stop(); });
}
}
What would prevent the store to be executed after the wait, leaving the program in an undesired state (possibly mWorkerBusy remaining true ad vitam, preventing the worker to be waked up)?
I've got the same issue with:
template <typename T>
void submitJob(T&& input) {
assert(ready());
{
std::lock_guard lk{mMutex};
mInput.slot = std::forward<T>(input);
}
mWorkerBusy.store(true, std::memory_order_relaxed);
mCV.notify_one();
}
What would prevent notify_one being called first (for instance)?
[EDIT] follow-up posted here
1. Mutex Synchronizes
The memory order doesn't guarantee you this.
std::memory_order_releaseprevents reads and writes on the current thread from being reordered after the store, but the memory operations could be reordered before, leading to the problem you've described.What actually protects you here is the
std::mutex. When you lock a mutex:- [requirements.mutex.general]/8
- [intro.races]/9
Leading back to your example:
storeis sequenced beforeunlock()on the producer threadunlock()inter-thread happens beforelock()on a consumer threadstorehappens beforelock()on a consumer threadIn simpler terms, when you lock a mutex, you see what other threads have done before unlocking.
2. Loading Atomics Is Unsynchronized
HOWEVER, there are some cases in your program with NO synchronization, and that can have unintended effects:
This code is a mess. There is no form of synchronization, so
w.ready()might be infinitely true in this loop, with the producer never seeing any change. What might save you is:- [atomics.order]/11
In case you're wondering why there is no synchronization: you're only loading a value.
std::memory_order_acquiremeans that if we see one change tomWorkerBusy, we must also see the preceding changes. However, it doesn't guarantee that we read the most recent value.In general, no synchronization between atomics takes place as long as you're only loading.
3. Condition Variables Synchronize
Absolutely nothing. You're using
std::memory_order_relaxed, so the operation can be reordered on the current thread past the.notify_one(). This is a mistake! Never usestd::memory_order_relaxedif order of loads/stores in relation to other operations is relevant!However, the condition variable performs some form of synchronization through the mutex. Whichever thread gets notified will acquire
mMutex, and will seemInput.slot = std::forward<T>(input);, because it happened beforemMutex.unlock()(implicitly called through the guard).Even with the "wrong" memory order, we can hijack the mutex to synchronize anyway:
4. Fixing Design Issues
As you can see, your use of atomics makes very little sense in both 2. and 3.. You expect atomics to provide synchronization, but it's actually the condition variables and mutexes that do this.
A better way to synchronize producers and consumers is to use multiple condition variables:
The consumers and producers can then notify one another, and there are no memory issues if you properly use
std::mutex.Another threading primitive that is specifically designed for synchronizing producers and consumers is
std::binary_semaphore/std::counting_semaphore, which would be much better tools in this case.