My thread-safe queue code appears to work, any possible race conditions, deadlocks, or other design problems?

123 views Asked by At

I am new to using condition_variables and unique_locks in C++. I am working on creating an event loop that polls two custom event-queues and a "boolean" (see integer acting as boolean), which can be acted upon by multiple sources.

I have a demo (below) that appears to work, which I would greatly appreciate if you can review and confirm if it follows the best practices for using unique_lock and condition_variables and any problems you foresee happening (race conditions, thread blocking, etc).

  1. In ThreadSafeQueue::enqueue(...): are we unlocking twice by calling notify and having the unique_lock go out of scope?

  2. In the method TheadSafeQueue::dequeueAll(): We assume it is being called by a method that has been notified (cond.notify), and therefore has been locked. Is there a better way to encapsulate this to keep the caller cleaner?

  3. Do we need to make our class members volatile similar to this?

  4. Is there a better way to mockup our situation that allows us to test if we've correctly implemented the locks? Perhaps without the sleep statements and automating the checking process?

ThreadSafeQueue.h:

#include <condition_variable>
#include <cstdint>
#include <iostream>
#include <mutex>
#include <vector>

template <class T>
class ThreadSafeQueue {
 public:
  ThreadSafeQueue(std::condition_variable* cond, std::mutex* unvrsl_m)
      : ThreadSafeQueue(cond, unvrsl_m, 1) {}
  ThreadSafeQueue(std::condition_variable* cond, std::mutex* unvrsl_m,
                  uint32_t capacity)
      : cond(cond),
        m(unvrsl_m),
        head(0),
        tail(0),
        capacity(capacity),
        buffer((T*)malloc(get_size() * sizeof(T))),
        scratch_space((T*)malloc(get_size() * sizeof(T))) {}

  std::condition_variable* cond;

  ~ThreadSafeQueue() {
    free(scratch_space);
    free(buffer);
  }

  void resize(uint32_t new_cap) {
    std::unique_lock<std::mutex> lock(*m);
    check_params_resize(new_cap);

    free(scratch_space);
    scratch_space = buffer;
    buffer = (T*)malloc(sizeof(T) * new_cap);
    copy_cyclical_queue();
    free(scratch_space);
    scratch_space = (T*)malloc(new_cap * sizeof(T));

    tail = get_size();
    head = 0;
    capacity = new_cap;
  }
  void enqueue(const T& value) {
    std::unique_lock<std::mutex> lock(*m);
    resize();
    buffer[tail++] = value;
    if (tail == get_capacity()) {
      tail = 0;
    } else if (tail > get_capacity())
      throw("Something went horribly wrong TSQ: 75");
    cond->notify_one();
  }

// Assuming m has already been locked by the caller...
  void dequeueAll(std::vector<T>* vOut) {
    if (get_size() == 0) return;
    scratch_space = buffer;
    copy_cyclical_queue();
    vOut->insert(vOut->end(), buffer, buffer + get_size());
    head = tail = 0;
  }

  // Const functions because they shouldn't be modifying the internal variables
  // of the object
  bool is_empty() const { return get_size() == 0; }
  uint32_t get_size() const {
    if (head == tail)
      return 0;
    else if (head < tail) {
      // 1 2 3
      // 0 1 2
      // 1
      // 0
      return tail - head;
    } else {
      // 3 _ 1 2
      // 0 1 2 3
      // capacity-head + tail+1 = 4-2+0+1 = 2 + 1
      return get_capacity() - head + tail + 1;
    }
  }
  uint32_t get_capacity() const { return capacity; }
  //---------------------------------------------------------------------------
 private:
  std::mutex* m;
  uint32_t head;
  uint32_t tail;
  uint32_t capacity;
  T* buffer;
  T* scratch_space;
  uint32_t get_next_empty_spot();
  void copy_cyclical_queue() {
    uint32_t size = get_size();
    uint32_t cap = get_capacity();
    if (size == 0) {
      return;  // because we have nothing to copy
    }
    if (head + size <= cap) {
      // _ 1 2 3 ... index = 1, size = 3, 1+3 = 4 = capacity... only need 1 copy
      memcpy(buffer, scratch_space + head, sizeof(T) * size);
    } else {
      // 5 1 2 3 4 ... index = 1, size = 5, 1+5 = 6 = capacity... need to copy
      // 1-4 then 0-1

      // copy number of bytes: front = 1, to (5-1 = 4 elements)
      memcpy(buffer, scratch_space + head, sizeof(T) * (cap - head));
      // just copy the bytes from the front up to the first element in the old
      // array
      memcpy(buffer + (cap - head), scratch_space, sizeof(T) * tail);
    }
  }
  void check_params_resize(uint32_t new_cap) {
    if (new_cap < get_size()) {
      std::cerr << "ThreadSafeQueue: check_params_resize: size(" << get_size()
                << ") > new_cap(" << new_cap
                << ")... data "
                   "loss will occur if this happens. Prevented."
                << std::endl;
    }
  }
  void resize() {
    uint32_t new_cap;
    uint32_t size = get_size();
    uint32_t cap = get_capacity();
    if (size + 1 >= cap - 1) {
      std::cout << "RESIZE CALLED --- BAD" << std::endl;
      new_cap = 2 * cap;

      check_params_resize(new_cap);

      free(scratch_space);     // free existing (too small) scratch space
      scratch_space = buffer;  // transfer pointer over
      buffer = (T*)malloc(sizeof(T) * new_cap);  // allocate a bigger buffer
      copy_cyclical_queue();
      // move over everything with memcpy from scratch_space to buffer
      free(scratch_space);  // free what used to be the too-small buffer
      scratch_space =
          (T*)malloc(sizeof(T) * new_cap);  // recreate scratch space

      tail = size;
      head = 0;
      // since we're done with the old array... delete for memory management->

      capacity = new_cap;
    }
  }
};
// Event Types
// keyboard/mouse
// network
// dirty flag

Main.cpp:


#include <unistd.h>

#include <cstdint>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <thread>

#include "ThreadSafeQueue.h"
using namespace std;

void write_to_threadsafe_queue(ThreadSafeQueue<uint32_t> *q,
                               uint32_t startVal) {
  uint32_t count = startVal;
  while (true) {
    q->enqueue(count);
    cout << "Successfully enqueued: " << count << endl;
    count += 2;
    sleep(count);
  }
}

void sleep_and_set_redraw(int *redraw, condition_variable *cond) {
  while (true) {
    sleep(3);
    __sync_fetch_and_or(redraw, 1);
    cond->notify_one();
  }
}

void process_events(vector<uint32_t> *qOut, condition_variable *cond,
                    ThreadSafeQueue<uint32_t> *q1,
                    ThreadSafeQueue<uint32_t> *q2, int *redraw, mutex *m) {
  while (true) {
    unique_lock<mutex> lck(*m);
    cond->wait(lck);
    q1->dequeueAll(qOut);
    q2->dequeueAll(qOut);
    if (__sync_fetch_and_and(redraw, 0)) {
      cout << "FLAG SET" << endl;
      qOut->push_back(0);
    }
    for (auto a : *qOut) cout << a << "\t";
    cout << endl;
    cout << "PROCESSING: " << qOut->size() << endl;
    qOut->clear();
  }
}

void test_2_queues_and_bool() {
  try {
    condition_variable cond;
    mutex m;
    ThreadSafeQueue<uint32_t> q1(&cond, &m, 1024);
    ThreadSafeQueue<uint32_t> q2(&cond, &m, 1024);
    int redraw = 0;
    vector<uint32_t> qOut;
    thread t1(write_to_threadsafe_queue, &q1, 2);
    thread t2(write_to_threadsafe_queue, &q2, 1);
    thread t3(sleep_and_set_redraw, &redraw, &cond);
    thread t4(process_events, &qOut, &cond, &q1, &q2, &redraw, &m);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
  } catch (system_error &e) {
    cout << "MAIN TEST CRASHED" << e.what();
  }
}

int main() { test_2_queues_and_bool(); }
0

There are 0 answers