I am developing a message queue between two processes on Windows. I would like to support multiple producers and one consumer. The queue must not be corrupted by the crash of one of the processes, that is, the other processes are not effected by the crash, and when the crashed process is restarted it can continue communication (with the new, updated state).
Assume that the event objects in these snippets are wrappers for named Windows Auto Reset Events and mutex objects are wrappers for named Windows mutex (I used the C++ non-interprocess mutex type as a placeholder).
This is the producer side:
void producer()
{
for (;;)
{
// Multiple producers modify _writeOffset so must be given exclusive access
unique_lock<mutex> excludeProducers(_producerMutex);
// A snapshot of the readOffset is sufficient because we use _notFullEvent.
long readOffset = InterlockedCompareExchange(&_readOffset, 0, 0);
// while is required because _notFullEvent.Wait might return because it was abandoned
while (IsFull(readOffset, _writeOffset))
{
_notFullEvent.Wait(INFINITE);
readOffset = InterlockedCompareExchange(&_readOffset, 0, 0);
}
// use a mutex to protect the resource from the consumer
{
unique_lock<mutex> lockResource(_resourceMutex);
produce(_writeOffset);
}
// update the state
InterlockedExchange(&_writeOffset, IncrementOffset(_writeOffset));
_notEmptyEvent.Set();
}
}
Similarly, this is the consumer side:
void consumer()
{
for (;;)
{
long writeOffset = InterlockedCompareExchange(&_writeOffset, 0, 0);
while (IsEmpty(_readOffset, writeOffset))
{
_notEmptyEvent.Wait(INFINITE);
writeOffset = InterlockedCompareExchange(&_writeOffset, 0, 0);
}
{
unique_lock<mutex> lockResource(_resourceMutex);
consume(_readOffset);
}
InterlockedExchange(&_readOffset, IncrementOffset(_readOffset));
_notFullEvent.Set();
}
}
Are there any race conditions in this implementation? Is it indeed protected against crashes as required?
P.S. The queue meets the requirements if the state of the queue is protected. If the crash occurred within the process(i) or consume(i) the contents of those slots might be corrupted and other means will be used to detect and maybe even correct corruption of those. Those means are out of the scope of this question.
There is indeed a race condition in this implementation. Thank you @VTT for pointing it out.
@VTT wrote that if the producer dies right before _notEmptyEvent.Set(); then consumer may get stuck forever.
Well, maybe not forever, because when the producer is resumed it will add an item and wake up the consumer again. But the state has indeed been corrupted. If, for instance this happens QUEUE_SIZE times, the producer will see that the queue is full (IsFull() will return true) and it will wait. This is a deadlock.
I am considering the following solution to this, adding the commented code on the producer side. A similar addition should be made on the consumer side:
This will cause the producer to wake up the consumer whenever it gets the chance to run, if indeed the queue is now not empty. This is looking more like a solution based on condition variables, which would have been my preferred pattern, were it not for the unfortunate fact that on Windows, condition variables are not named and therefore cannot be shared between processes.
If this solution is voted correct, I will edit the original post with the complete code.