I have to program a multiple producer-consumer system in C++, but I'm lost trying to put together each part of the model (threads with its correct buffer). The basic functioning of the model is: I have an initial thread that executes a function. This returned results need to be put in an undetermined number of buffers, because each elements that the function proccess is different and it needs to be treated in a single thread. Then, with the data stored in the buffers, another n
threads need to get the data of this buffers to do another function, and the return of this need to be put in some buffers again.
At the moment I have got this buffer structure created:
template <typename T>
class buffer {
public:
atomic_buffer(int n);
int bufSize() const noexcept;
bool bufEmpty() const noexcept;
bool full() const noexcept;
~atomic_buffer() = default;
void put(const T & x, bool last) noexcept;
std::pair<bool,T> get() noexcept;
private:
int next_pos(int p) const noexcept;
private:
struct item {
bool last;
T value;
};
const int size_;
std::unique_ptr<item[]> buf_;
alignas(64) std::atomic<int> nextRd_ {0};
alignas(64) std::atomic<int> nextWrt_ {0};
};
I've also created a vector
structure which stores a collection un buffers, in order to satisfy the undetermined number of threads necessity.
std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1;
for(int i=0; i<n; i++){
v1.push_back(std::unique_ptr<locked_buffer<std::pair<int,std::vector<std::vector<unsigned char>>>>> (new locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>(aux)));
}
Edit:
Without knowing more context, this looks like an application for a standard thread pool. You have different tasks that are enqueued to a synchronized queue (like the
buffer
class you have there). Each worker thread of the thread pool polls this queue and processes one task each time (by executing arun()
method for example). They write the results back into another synchronized queue.Each worker thread has an own thread-local pair of input and output buffers. They don't need synchronization because they are only accessed from within the owner thread itself.
Edit: Actually, I think this can be simplified a lot: Just use a thread pool and one synchronized queue. The worker threads can enqueue new tasks directly into the queue. Each of your threads in the drawing would correspond to one type of task and implement a common
Task
interface. You don't need mutiple buffers. You can use polymorphism and put everything in one buffer.Edit 2 - Explanation of thread pools:
A thread pool is just a concept. Forget about the pooling aspect, use a fixed number of threads. The main idea is: Instead of having several threads with a specific function, have N threads that can process any kind of task. Where N is the number of cores of the CPU.
You can transform this
into
The worker thread does something like the following. Note that this is simplified, but you should get the idea.
And your tasks implement a common interface so you can put
Task*
pointers into a single queue:Also, do yourself a favour and use typedefs ;)
becomes