Condition variables and real time priorities in pthreads

1.5k views Asked by At

I have two threads, a producer and a consumer. The producer thread recives data from another program through a named pipe at different rates, and forwards it to a consumer thread through a queue. The scheduler policy is RR and the producer thread has higher priority than the consumer thread. I want the producer to signal that there is new data on the queue, and have the consumer wait until the producer blocks, which will happen when there is no data to read from the named pipe.

The main thread sets the priorities:

policy = SCHED_FIFO;    

pthread_attr_init(&tattr);
pthread_attr_setinheritsched(&tattr, PTHREAD_EXPLICIT_SCHED);
pthread_attr_setscope(&tattr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setschedpolicy(&tattr, policy);

param.sched_priority = sched_get_priority_max(policy);
pthread_attr_setschedparam(&tattr, &param);
pthread_create(&tid[0], &tattr, producer, fifo);

param.sched_priority = sched_get_priority_min(policy);
pthread_attr_setschedparam(&tattr, &param);
pthread_create(&tid[1], &tattr, consumer, fifo);

The producer does this:

fd = open(pipe, O_RDONLY);
while((read(fd, buf, 1024))!=0){
    val = atoi(buf);
    if(val > SOMETHING){
        do_something();
    } else {
        pthread_mutex_lock (fifo->mut);
        while (fifo->full) {
            pthread_cond_wait (fifo->notFull, fifo->mut);
        }
        queueAdd (fifo, val);
        pthread_mutex_unlock (fifo->mut);
        pthread_cond_signal (fifo->notEmpty);
    }
}

The consumer:

while(TRUE){
    pthread_mutex_lock (fifo->mut);
    while (fifo->empty) {
        pthread_cond_wait (fifo->notEmpty, fifo->mut);
    }
    queueDel (fifo, &d);
    do_something_else(d);
    pthread_mutex_unlock (fifo->mut);
    pthread_cond_signal (fifo->notFull);
}

After the signaling, the lower priority thread takes over. What am I doing wrong?

Thanks in advance!

EDIT: Changed the names of the threads. I had changed the names when posting it here, because my code is in spanish and the function names are something other than producer consumer, and made a mistake. But unluckily it's not that simple.

What I mean by 'takes over' is that the consumer continues execution. What I want is for it to start if and only if the producer thread blocks or exits.

2

There are 2 answers

1
Freddie Chopin On

Try to compile your project with optimizations DISABLED, because if you have them enabled right now and the variables fifo->empty and fifo->full are NOT declared volatile, then there is a big possibility that compiler optimizes some parts of your code in a way that you wouldn't like - the value is tested only once, and if it is true your code falls into an endless loop (the one with pthread_cond_wait()) that it will not leave no matter what, because compiler cannot see how pthread_cond_wait() could alter it.

If that's not it, then the behavior you describe is still not clear to me... Your example should work in the following way:

  • producer puts something in the queue until it's full, then blocks,
  • consumer takes the item from the queue, and signals the producer that there is one free slot,
  • producer wakes up (assuming it's not blocked on read()) and puts one item into the queue (that's the whole free space), then blocks,
  • ...

How is the behavior you see different from what you expect? How can consumer "take over" when there's no data in the queue? It should stop "taking over" after the first (which is also the last) element taken from the queue and wait for producer to add anything.

8
abligh On

It would be helpful if you were a little clearer about what you meant by 'the lower priority thread takes over' this would be easier to determine. But I don't think (with the code as it is) this has anything to do with scheduling policy.

From the POSIX spec (OK, from The Single UNIX Specification, Version 2, but same difference):

If more than one thread is blocked on a condition variable, the scheduling policy determines the order in which threads are unblocked. When each thread unblocked as a result of a pthread_cond_signal() or pthread_cond_broadcast() returns from its call to pthread_cond_wait() or pthread_cond_timedwait(), the thread owns the mutex with which it called pthread_cond_wait() or pthread_cond_timedwait(). The thread(s) that are unblocked contend for the mutex according to the scheduling policy (if applicable), and as if each had called pthread_mutex_lock().

Now, because of your mutex ordering (see comment in final paragraph), the other thread does not hold the mutex when pthread_cond_signal is performed. So, on a single CPU machine, we know it's going to get scheduled in regardless of scheduling policy, so the signalled thread will instantly grab the mutex, before the signalling thread manages to relock it. IE as only one thread is contending for the mutex at the time, the scheduling policy is irrelevant, and it will always get it.

Is this what you are referring to?

Also, if do_something_else(d); is in the slightest time-consuming, you don't want to do that with the mutex held, or it will stop the producer running.

It's possible that there is another thing at play here. Another problem might be that your pthread_cond_signal is not working because of a race condition. You don't have to call pthread_cond_signal with the mutex held, but it is a good idea IMHO. In this instance, for instance, in the consumer you unlock the mutex after removing something from the queue, then do the signal. Between dropping the mutex and making the signal the FIFO may have changed state, and referring to fifo->notFull is technically unsafe. I suggest swapping the pthread_mutex_unlock and the pthread_cond_signal around; this may also improve scheduling.

EDIT

Now redundant as this was the OP's error, since fixed

Is this actually really simple? In your original message you say:

The scheduler policy is RR and the producer thread has higher priority than the consumer thread.

However in your code:

param.sched_priority = sched_get_priority_max(policy);
pthread_attr_setschedparam(&tattr, &param);
pthread_create(&tid[0], &tattr, consumerr, fifo);

param.sched_priority = sched_get_priority_min(policy);
pthread_attr_setschedparam(&tattr, &param);
pthread_create(&tid[1], &tattr, producer, fifo);

Which looks to me like it's the consumer which is given maximum priority, and the producer which is given the minimum.