Questions regarding lock-free vs mutex solution for Leetcode 1117: Building H2O

158 views Asked by At

I'm working on practicing C++ concurrency and learning about the practical applications of using mutexes vs lock free atomics. I was working on the following problem from Leetcode: https://leetcode.com/problems/building-h2o/description/

Basically, different threads are responsible for either releasing a hydrogen atom or oxygen atom and you want to synchronize so that every consecutive set of 3 atoms consists of 2 hydrogen and 1 oxygen.

I implemented the solution in 2 ways, one using a mutex and one using atomics.

Mutex Solution:

class H2O {
    mutex m;
    condition_variable m_cond;
    int idx;
public:
    H2O() : idx(0) {
    }

    void hydrogen(function<void()> releaseHydrogen) {
        unique_lock<mutex> mlock(m);
        m_cond.wait(mlock, [this](){return idx != 0;});
        // releaseHydrogen() outputs "H". Do not change or remove this line.
        releaseHydrogen();
        idx = (idx+1) % 3;
        m_cond.notify_all();
    }

    void oxygen(function<void()> releaseOxygen) {
        unique_lock<mutex> mlock(m);
        m_cond.wait(mlock, [this](){return idx == 0;});
        // releaseOxygen() outputs "O". Do not change or remove this line.
        releaseOxygen();
        idx = (idx+1)%3;
        m_cond.notify_all();
    }
};

Atomics Solution:

class H2O {
    /* state is one of {00, 01, 10, 20, 21} where the first digit represents the number of hydrogen atoms acquires and the second digit is the number of oxygen atoms acquired */
    atomic<int> state_{0};
    /* stores the number of atoms that we have finished processing, increments from 0 to 3 and resets back to 3*/
    atomic<int> completedCount_{0};

public:
    H2O() {}

    void acquireHydrogen(){
        int curState = state_.load();
        do{
            while(curState/10 == 2){
                // full, 2 hydrogen atoms have already been acquired
                curState = state_.load();
            }
        } while(!state_.compare_exchange_weak(curState, curState + 10));
            // modify the state to show that 1 more hydrogen has been acquired
    }

    void acquireOxygen(){
        int curState = state_.load();
        do{
            while(curState % 10 == 1){
                // full, 1 oxygen has already been acquired
                curState = state_.load();
            }
        } while(!state_.compare_exchange_weak(curState, curState + 1));
            // modify the state to show that 1 oxygen has been acquired
    }

    void complete(){
        // increment count of completed
        completedCount_.fetch_add(1);
        int expected = 3;
        /* The thread that resets the completed count back to 0 is responsible for resetting the acquired state as well.
        If more than 1 acquired thread tries to reset state, in between 2 of these resets a new set of atoms might already be acquired which we don't want to write over. */
        if(completedCount_.compare_exchange_strong(expected, 0)){
            state_.store(0);
        }
    }
    void hydrogen(function<void()> releaseHydrogen) {
        acquireHydrogen();
        releaseHydrogen(); // prints "H"
        complete();
    }

    void oxygen(function<void()> releaseOxygen) {
        acquireOxygen();
        releaseOxygen(); // prints "O"
        complete();
    }
};

The code using mutexes is much more simple and also runs around ~20 times faster than the code using atomics on average when I submit to Leetcode. I'm trying to better understand when to use locks/mutexes vs when to prefer atomics/lock-free. I have the following questions:

  1. In this case, I don't know how the Leetcode server is actually running threads to execute the tests and how many processors/cores it has available. My understanding is that with atomics, you should get better throughput since less threads are "waiting" to get a lock. However I'm guessing in this problem since there can only be consecutive sets of 2 hydrogens and 1 oxygen being released, if there are many threads being run, then only up to 3 of them can be concurrently releasing the respective atoms. Is there an example of when the atomic solution to this problem might be expected to be more performant/faster than the mutex-based one? Or is this an example of a problem where you would expect mutexes to work better to begin with generally speaking?

  2. Is there a way to more efficiently write a solution using atomics? Maybe some of the while loops and CAS operations are not needed or can be structured differently?

  3. I also tried specifying memory order for the atomic solution where I made reads -> memory_order_acquire, writes -> memory_order_released, and rmw -> memory_order_acq_rel. When I submitted the code a bunch of times, it seems like relaxing the memory order made the code on average around 1-2 times faster. In general, when writing code using atomics, can you typically specify memory order as above? How do you decide whether you need true sequential consistency between all atomic operations vs relaxing the semantics depending on whether it's a read, write, or rmw?

I know this is a long post but would really appreciate any thoughts!

1

There are 1 answers

9
RandomBits On

I don't think you can make any performance determinations based on the competition testing without seeing the test harness. Those results seem to be all over the place.

I made a simple test harness that simply spews hydrogens and oxygens from two different threads in the correct proportion. My goal was to isolate the performance of the synchronization code versus the thread overhead.

As expected, the std::atomic was much faster than std::mutex for this problem (732us versus 16ms). My intuition from experience is that std::atomic is likely to be faster (lower latency) while std::mutex is likely to consume less power (efficient sleeping). The usual caveat applies in that you really just have to measure the performance on for your use-case, OS and hardware.

The one thing I would suggest for your std::atomic code is to throw in std::this_thread::yield() in the while loops so the thread will release its time slice if it cannot move forward. This will reduce thread contention if there a many threads trying to grab the resource.

Sample Code

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

class H2O {
    std::mutex m;
    std::condition_variable m_cond;
    int idx;
public:
    H2O() : idx(0) {
    }

    void hydrogen(std::function<void()> releaseHydrogen) {
        std::unique_lock<std::mutex> mlock(m);
        m_cond.wait(mlock, [this](){return idx != 0;});
        // releaseHydrogen() outputs "H". Do not change or remove this line.
        releaseHydrogen();
        idx = (idx+1) % 3;
        m_cond.notify_all();
    }

    void oxygen(std::function<void()> releaseOxygen) {
        std::unique_lock<std::mutex> mlock(m);
        m_cond.wait(mlock, [this](){return idx == 0;});
        // releaseOxygen() outputs "O". Do not change or remove this line.
        releaseOxygen();
        idx = (idx+1)%3;
        m_cond.notify_all();
    }
};

class H2OAtomic {
    /* state is one of {00, 01, 10, 20, 21} where the first digit represents the number of hydrogen\
 atoms acquires and the second digit is the number of oxygen atoms acquired */
    std::atomic<int> state_{0};
    /* stores the number of atoms that we have finished processing, increments from 0 to 3 and rese\
ts back to 3*/
    std::atomic<int> completedCount_{0};

public:
    H2OAtomic() {}

    void acquireHydrogen(){
        int curState = state_.load();
        do{
            while(curState/10 == 2){
                // full, 2 hydrogen atoms have already been acquired
                curState = state_.load();
            }
        } while(!state_.compare_exchange_weak(curState, curState + 10));
            // modify the state to show that 1 more hydrogen has been acquired
    }

    void acquireOxygen(){
        int curState = state_.load();
        do{
            while(curState % 10 == 1){
                // full, 1 oxygen has already been acquired
                curState = state_.load();
            }
        } while(!state_.compare_exchange_weak(curState, curState + 1));
            // modify the state to show that 1 oxygen has been acquired
    }

    void complete(){
        // increment count of completed
        completedCount_.fetch_add(1);
        int expected = 3;
        /* The thread that resets the completed count back to 0 is responsible for resetting the ac\
quired state as well.
        If more than 1 acquired thread tries to reset state, in between 2 of these resets a new set\
 of atoms might already be acquired which we don't want to write over. */
        if(completedCount_.compare_exchange_strong(expected, 0)){
            state_.store(0);
        }
    }
    void hydrogen(std::function<void()> releaseHydrogen) {
        acquireHydrogen();
        releaseHydrogen(); // prints "H"
        complete();
    }

    void oxygen(std::function<void()> releaseOxygen) {
        acquireOxygen();
        releaseOxygen(); // prints "O"
        complete();
    }
};

template<class Clock = std::chrono::high_resolution_clock>
class StopWatch
{
public:
    StopWatch()
        : start_tp_(Clock::now())
        , last_tp_(start_tp_)
    { }

    auto now() const
    {
        std::atomic_thread_fence(std::memory_order_relaxed);
        auto current_tp = Clock::now();
        std::atomic_thread_fence(std::memory_order_relaxed);
        return current_tp;
    }

    auto mark()
    {
        auto current_tp = now();
        auto elapsed = current_tp - last_tp_;
        last_tp_ = current_tp;
        return elapsed;
    }

    template<class Units = typename Clock::duration>
    auto elapsed_duration()
    {
        auto elapsed = mark();
        return std::chrono::duration_cast<Units>(elapsed);
    }

    template<class Units = typename Clock::duration>
    auto elapsed_time()
    {
        auto elapsed = mark();
        return std::chrono::duration_cast<Units>(elapsed).count();
    }

private:
    typename Clock::time_point start_tp_;
    typename Clock::time_point last_tp_;
};

using std::cout, std::endl;

void release_hydrogen() {
    // cout << "H";
}

void release_oxygen() {
    // cout << "O";
}

template<class Builder, class T, class U>
void build_water(int n, T&& h, U&& o) {
    Builder builder;
    auto h0th = std::thread([&]() {
        for (auto i = 0; i < n; ++i)
            builder.hydrogen(h);
    });
    auto h1th = std::thread([&]() {
        for (auto i = 0; i < n; ++i)
            builder.hydrogen(h);
    });
    auto oth = std::thread([&]() {
        for (auto i = 0; i < n; ++i)
            builder.oxygen(o);
    });

    h0th.join();
    h1th.join();
    oth.join();
}

template<class Work>
void measure(std::string_view desc, Work&& work) {
    StopWatch timer;
    timer.mark();
    work();
    auto n = timer.elapsed_duration<std::chrono::microseconds>().count();
    cout << desc << " : " << n << endl;
}

int main(int argc, const char *argv[]) {
    measure("mutex", [&]() {
        build_water<H2O>(3000, release_hydrogen, release_oxygen);
    });
    measure("atomic", [&]() {
        build_water<H2OAtomic>(3000, release_hydrogen, release_oxygen);
    });

    return 0;
}

Output

mutex : 16447
atomic : 732