At the moment I am writing some kind of Fork/Join pattern using std::threads. Therefore I wrote a wrapper class for std::thread which uses a reference counter for all children.
Whenever a child finishes its execution the reference counter is decremented and a notification is sent to all waiting threads. The waiting threads wait for the reference counter to become 0 which means all child threads finished their execution.
Unfortunately, it seems that sometimes the notification is being missed. I've debugged the program using gdb which showed me that the reference counter in the deepest blocking thread was actually already 0 but it didn't recognize it.
The class is called ThreadAttachment:
/**
* \brief For the \p ThreadScheduler the attachment object is a thread itself since for each task a single thread is created.
*
* Children management is required for the fork/join model. It is realized by using an atomic reference counter.
* The reference counter is initially set or changed dynamically by threadsafe operations.
* It is decreased automatically whenever a child task finishes its execution.
*/
class ThreadAttachment : public Attachment
{
public:
/**
* Creates a new thread attachment without creating the actual thread nor starting it.
* \param task The thread attachment is created for the corresponding task \p task.
*/
ThreadAttachment(Task *task);
virtual ~ThreadAttachment();
/**
* Sets the counter of the child tasks.
* \note Threadsafe.
*/
void setChildCount (int count);
/**
* Increments the counter of the child tasks by one.
* \note Threadsafe.
*/
void incrementChildCount();
/**
* Decrements the counter of the child tasks by one.
*
* Besides it notifies \ref m_childrenConditionVariable for all threads which means that all threads which are calling \ref joinChildren() are being awakened.
* \note Threadsafe.
*/
void decrementChildCount();
/**
* \return Returns the counter of the child tasks.
* \note Threadsafe.
*/
int childCount();
/**
* Joins all added children thread attachments.
* Waits for notifications of \ref m_childrenConditionVariable if the counter of child tasks is not already 0.
* Checks on each notification for the counter to become 0. If the counter is finally 0 it stops blocking and continues the execution.
*/
void joinChildren();
/**
* Allocates the actualy std::thread instance which also starts the thread immdiately.
* The thread executes the corresponding task safely when executed itself by the operating systems thread scheduler.
* \note This method should only be called once.
*/
void start();
/**
* Joins the previously with \ref start() allocated and started std::thread.
* If the std::thread is already done it continues immediately.
*/
void join();
/**
* Detaches the previously with \ref start() allocated and started std::thread.
* This releases the thread as well as any control.
*/
void detach();
private:
/**
* The thread is created in \ref start().
* It must be started after all attachment properties have been set properly.
*/
std::unique_ptr<std::thread> m_thread;
/**
* This mutex protects concurrent operations on \ref m_thread.
*/
std::mutex m_threadMutex;
/**
* A reference counter for all existing child threads.
* If this value is 0 the thread does not have any children.
*/
std::atomic_int m_childrenCounter;
/**
* This mutex is used for the condition variable \ref m_childrenConditionVariable when waiting for a notification.
*/
std::mutex m_childrenConditionVariableMutex;
/**
* This condition variable is used to signal this thread whenever one of his children finishes and its children counter is decreased.
* Using this variable it can wait in \ref join() for something to happen.
*/
std::condition_variable m_childrenConditionVariable;
};
The method start() starts the thread:
void ThreadAttachment::start()
{
/*
* Use one single attachment object only once for one single task.
* Do not recycle it to prevent confusion.
*/
assert(this->m_thread.get() == nullptr);
ThreadAttachment *attachment = this;
/*
* Lock the mutex to avoid data races on writing the unique pointer of the thread which is not threadsafe itself.
* When the created thread runs it can write data to itself safely.
* It is okay to lock the mutex in the method start() since the creation of the thread does not block.
* It immediately returns to the method start() in the current thread.
*/
std::mutex &mutex = this->m_threadMutex;
{
std::lock_guard<std::mutex> lock(mutex);
/*
* The attachment should stay valid until the task itself is destroyed.
* So it can be passed safely.
*
* http://stackoverflow.com/a/7408135/1221159
*
* Since this call does not block and the thread's function is run concurrently the mutex will be unlocked and then the thread can acquire it.
*/
this->m_thread.reset(new std::thread([attachment, &mutex]()
{
/*
* Synchronize with the thread's creation.
* This lock will be acquired after the method start() finished creating the thread.
* It is used as simple barrier but should not be hold for any time.
* Otherwise potential deadlocks might occur if multiple locks are being hold especially in decreaseParentsChildrenCounter()
*/
{
std::lock_guard<std::mutex> lock(mutex);
}
attachment->correspondingTask()->executeSafely();
/*
* After spawning and joining in the task's logic there should be no more children left.
*/
assert(attachment->childCount() == 0);
/*
* Finally the children counter of the parent task has to be decreased.
* This has to be done by the scheduler since it is a critical area (access of the different attachments) and therefore must be locked.
*/
ThreadScheduler *scheduler = dynamic_cast<ThreadScheduler*>(attachment->correspondingTask()->scheduler());
assert(scheduler);
scheduler->decreaseParentsChildrenCounter(attachment);
}));
}
}
This is the method decreaseParentsChildrenCounter() of the class ThreadScheduler:
void ThreadScheduler::decreaseParentsChildrenCounter(ThreadAttachment *attachment)
{
{
std::lock_guard<std::mutex> lock(this->m_mutex);
Task *child = attachment->correspondingTask();
assert(child != nullptr);
Task *parent = child->parent();
if (parent != nullptr)
{
Attachment *parentAttachment = this->attachment(parent);
assert(parentAttachment);
ThreadAttachment *parentThreadAttachment = dynamic_cast<ThreadAttachment*>(parentAttachment);
assert(parentThreadAttachment);
/*
* The parent's children counter must still be greater than 0 since this child is still missing.
*/
assert(parentThreadAttachment->childCount() > 0);
parentThreadAttachment->decrementChildCount();
}
}
}
It basically calls decrementChildCount() for the parent thread.
The method joinChildren() waits for all children to be finished:
void ThreadAttachment::joinChildren()
{
/*
* Since the condition variable is notified each time the children counter is decremented
* it will always awake the wait call.
* Otherwise the predicate check will make sure that the parent thread continues work.
*/
std::unique_lock<std::mutex> l(this->m_childrenConditionVariableMutex);
this->m_childrenConditionVariable.wait(l,
[this]
{
/*
* When the children counter reached 0 no more children are executing and the parent can continue its work.
*/
return this->childCount() == 0;
}
);
}
These are the atomic counter operations and as you can see I do send a notification whenever the value is decremented:
void ThreadAttachment::setChildCount(int counter)
{
this->m_childrenCounter = counter;
}
void ThreadAttachment::incrementChildCount()
{
this->m_childrenCounter++;
}
void ThreadAttachment::decrementChildCount()
{
this->m_childrenCounter--;
/*
* The counter should never be less than 0.
* Otherwise it has not been initialized properly.
*/
assert(this->childCount() >= 0);
/*
* Notify all thread which call joinChildren() which should usually only be its parent thread.
*/
this->m_childrenConditionVariable.notify_all();
}
int ThreadAttachment::childCount()
{
return this->m_childrenCounter.load();
}
As test case I calculate a Fibonacci number recursively with the Fork/Join pattern. I thought that if the notification is missed it should check the predicate and detect the children counter to be 0. Apparently the value becomes 0 so how can it be missed?
Update the variables affecting the condition (in this case the member
count
) only within a lock for the mutex corresponding to the condition (this->m_childrenConditionVariableMutex
).See this answer for the reasoning.