Is this implementation of inter-process Producer Consumer correct and safe against process crash?

365 views Asked by At

I am developing a message queue between two processes on Windows. I would like to support multiple producers and one consumer. The queue must not be corrupted by the crash of one of the processes, that is, the other processes are not effected by the crash, and when the crashed process is restarted it can continue communication (with the new, updated state).

Assume that the event objects in these snippets are wrappers for named Windows Auto Reset Events and mutex objects are wrappers for named Windows mutex (I used the C++ non-interprocess mutex type as a placeholder).

This is the producer side:

void producer()
{
    for (;;)
    {
        // Multiple producers modify _writeOffset so must be given exclusive access

        unique_lock<mutex> excludeProducers(_producerMutex);

        // A snapshot of the readOffset is sufficient because we use _notFullEvent.

        long readOffset = InterlockedCompareExchange(&_readOffset, 0, 0);

        // while is required because _notFullEvent.Wait might return because it was abandoned

        while (IsFull(readOffset, _writeOffset))
        {
            _notFullEvent.Wait(INFINITE);

            readOffset = InterlockedCompareExchange(&_readOffset, 0, 0);
        }

        // use a mutex to protect the resource from the consumer
        {
            unique_lock<mutex> lockResource(_resourceMutex);
            produce(_writeOffset);
        }

        // update the state

        InterlockedExchange(&_writeOffset, IncrementOffset(_writeOffset));
        _notEmptyEvent.Set();
    }
}

Similarly, this is the consumer side:

void consumer()
{
    for (;;)
    {
        long writeOffset = InterlockedCompareExchange(&_writeOffset, 0, 0);

        while (IsEmpty(_readOffset, writeOffset))
        {
            _notEmptyEvent.Wait(INFINITE);
            writeOffset = InterlockedCompareExchange(&_writeOffset, 0, 0);
        }

        {
            unique_lock<mutex> lockResource(_resourceMutex);
            consume(_readOffset);
        }

        InterlockedExchange(&_readOffset, IncrementOffset(_readOffset));
        _notFullEvent.Set();
    }
}

Are there any race conditions in this implementation? Is it indeed protected against crashes as required?

P.S. The queue meets the requirements if the state of the queue is protected. If the crash occurred within the process(i) or consume(i) the contents of those slots might be corrupted and other means will be used to detect and maybe even correct corruption of those. Those means are out of the scope of this question.

2

There are 2 answers

1
David Sackstein On

There is indeed a race condition in this implementation. Thank you @VTT for pointing it out.

@VTT wrote that if the producer dies right before _notEmptyEvent.Set(); then consumer may get stuck forever.

Well, maybe not forever, because when the producer is resumed it will add an item and wake up the consumer again. But the state has indeed been corrupted. If, for instance this happens QUEUE_SIZE times, the producer will see that the queue is full (IsFull() will return true) and it will wait. This is a deadlock.

I am considering the following solution to this, adding the commented code on the producer side. A similar addition should be made on the consumer side:

void producer()
{
    for (;;)
    {
        // Multiple producers modify _writeOffset so must be given exclusive access

        unique_lock<mutex> excludeProducers(_producerMutex);

        // A snapshot of the readOffset is sufficient because we use _notFullEvent.

        long readOffset = InterlockedCompareExchange(&_readOffset, 0, 0);

        // ====================== Added begin

        if (!IsEmpty(readOffset, _writeOffset))
        {
            _notEmptyEvent.Set();
        }

        // ======================= end Added

        // while is required because _notFullEvent.Wait might return because it was abandoned

        while (IsFull(readOffset, _writeOffset))

This will cause the producer to wake up the consumer whenever it gets the chance to run, if indeed the queue is now not empty. This is looking more like a solution based on condition variables, which would have been my preferred pattern, were it not for the unfortunate fact that on Windows, condition variables are not named and therefore cannot be shared between processes.

If this solution is voted correct, I will edit the original post with the complete code.

4
Harry Johnston On

So there are a few problems with the code posted in the question:

  • As already noted, there's a marginal race condition; if the queue were to become full, and all the active producers crashed before setting _notFullEvent, your code would deadlock. Your answer correctly resolves that problem by setting the event at the start of the loop rather than the end.

  • You're over-locking; there's typically little point in having multiple producers if only one of them is going to be producing at a time. This prohibits writing directly into shared memory, you'll need a local cache. (It isn't impossible to have multiple producers writing directly into different slots in the shared memory, but it would make robustness much more difficult to achieve.)

  • Similarly, you typically need to be able to produce and consume simultaneously, and your code doesn't allow this.

Here's how I'd do it, using a single mutex (shared by both consumer and producer threads) and two auto-reset event objects.

void consumer(void)
{
    claim_mutex();
    for (;;)
    {
        if (!IsFull(*read_offset, *write_offset))
        {
            // Queue is not full, make sure at least one producer is awake
            SetEvent(notFullEvent);
        }

        while (IsEmpty(*read_offset, *write_offset))
        {
            // Queue is empty, wait for producer to add a message
            release_mutex();
            WaitForSingleObject(notEmptyEvent, INFINITE);
            claim_mutex();
        }

        release_mutex();
        consume(*read_offset);
        claim_mutex();

        *read_offset = IncrementOffset(*read_offset);
    }
}

void producer(void)
{
    claim_mutex();
    for (;;)
    {
        if (!IsEmpty(*read_offset, *write_offset))
        {
            // Queue is not empty, make sure consumer is awake
            SetEvent(notEmptyEvent);
        }

        if (!IsFull(*read_offset, *write_offset))
        {
            // Queue is not full, make sure at least one other producer is awake
            SetEvent(notFullEvent);
        }

        release_mutex();
        produce_in_local_cache();
        claim_mutex();

        while (IsFull(*read_offset, *write_offset))
        {
            // Queue is full, wait for consumer to remove a message
            release_mutex();
            WaitForSingleObject(notFullEvent, INFINITE);
            claim_mutex();
        }

        copy_from_local_cache_to_shared_memory(*write_offset);
        *write_offset = IncrementOffset(*write_offset);
    }
}