Consumer / Producer with wait on both ends

161 views Asked by At

I wrote a producer/consumer program using mutex and condition. It use a global int to produce & consume values on. There are 1 consumer thread, and multiple producer threads.

Rules:

  1. When the value is too small, then consumer will wait.

  2. When the value is too big, then producers will wait.

My question is:

We know consumer normally need to wait, but producers depends.
In my example they both need to check condition, and might wait each other, is this a good practice?
Would it possible to cause dead lock in my following implementation?

Code:

// condition test, a producer/consumer program,

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

static int glob = 0; // global variable, shared by threads,
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

/**
 * increase once, with lock, it's producer,
 * 
 * @param arg
 *  {max, point}
 *  where:
 *      max, max value that would increase to,
 *      point, is min value that would trigger consume,
 *  
 * @return
 *  0, not changed,
 *  >0, increased,
 *  <0, error,
 */
static void *inc(void *arg) {
    int *args = (int *)arg;
    int max = args[0];
    int point = args[1];

    int result;
    int n = 0;

    if((result = pthread_mutex_lock(&mtx)) != 0) { // lock
        printf("error to get lock: %d\n", result);
        pthread_exit(NULL); // terminate if error,
    } else {
        while(glob >= max) {
            if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait
                printf("failed to wait for condition: %d\n", result);
                return (void *)-1;
            }
        }

        // do jobs,
        glob++; // this will be compiled into multiple lines in machine code, so it's not automic,
        n = 1;
        /*
        printf("inc by 1, %d\n", glob);
        fflush(stdout);
        */

        if(glob >= point) { // condition signal
            if((result = pthread_cond_signal(&cond)) !=0 ) {
                printf("error to condition signal: %d\n", result);
                return (void *)-1;
            } else {
                // printf("condition signal, from thread [%d], value: %d\n", (int)pthread_self(),  glob);
            }
        }

        if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock
            printf("error to unlock: %d\n", result);
            return (void *)-1;
        }
    }

    return (void *)n;
}

// increase loop,
static void *inc_loop(void *arg) {
    int result;
    while(1) {
        if((result = (int)inc(arg)) < 0) {
            return (void *)result;
        }
    }
}

/**
 * consumer, with lock,
 * 
 * @param arg
 *  {point, steps}
 *  where:
 *      point, is min value that would trigger consume,
 *      steps, is the count each consume would take,
 *  
 * @return
 *  0, not consumed,
 *  >0, consumed,
 *  <0, error,
 */
static void *consume(void *arg) {
    int *args = (int *)arg;
    int point = args[0];
    int step = args[1];
    int result;
    int n = 0;

    if((result = pthread_mutex_lock(&mtx)) != 0) { // lock
        printf("error to get lock: %d\n", result);
        pthread_exit(NULL); // terminate if error,
    } else {
        while(glob < point) {
            pthread_cond_broadcast(&cond); // broadcast
            printf("broadcast, and sleep,\n");

            if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait
                printf("failed to wait for condition: %d\n", result);
                return (void *)-1;
            }
        }

        // do job
        printf("going to perform consume: %d -> ", glob);
        glob-=(glob>=step?step:glob);
        printf("%d\n", glob);
        n = 1;

        if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock
            printf("error to unlock: %d\n", result);
        }
    }

    return (void *)n;
}

// condition test
int condition_test(void *(*func_inc_loop) (void *), void *(*func_consume) (void *), int thread_count, int max, int point, int consume_count, int step) {
    pthread_t threads[thread_count];

    int result, i;
    int inc_args[] = {
        max, // max value that would increase to,
        point // min value that would trigger consume,
    };

    // start threads
    for(i=0; i<thread_count; i++) {
        if((result = pthread_create(threads+i, NULL, func_inc_loop, inc_args)) != 0) {
            printf("error create thread [%d]: %d\n", i, result);
        }
    }

    int loops = 0;
    int consume_args[] = {
        point, // min point to trigger consume,
        step // consume steps
    };

    // begin consume loop,
    while(loops < consume_count) {
        if(func_consume(consume_args) > 0) {
            loops++;
        }
    }

    printf("\nDone.\n");

    return 0;
}

/**
 * command line:
 *  ./a.out <[thread_count]> <[max]> <[point]> <[consume_count]>
 */
int main(int argc, char *argv[]) {
    int thread_count = 3;
    int max = 1000;
    int point = 100;
    int consume_count = 10; // how many times consume execute,
    int step = 200; // max count in each consume,

    if(argc >= 2) {
        thread_count = atoi(argv[1]);
    }
    if(argc >= 3) {
        max = atoi(argv[2]);
    }
    if(argc >= 4) {
        point = atoi(argv[3]);
    }
    if(argc >= 5) {
        consume_count = atoi(argv[4]);
    }
    if(argc >= 6) {
        step = atoi(argv[5]);
    }

    condition_test(&inc_loop, &consume, thread_count, max, point, consume_count, step);

    return 0;
}

Compile:

gcc -pthread xxx.c

Execute:

./a.out

1

There are 1 answers

1
n3wcod3r On BEST ANSWER

In practice, you should not use mutex to solve a producer/consumer or reader-writer problem. It necessarily does not give rise to deadlocks but might lead to starvation of either producers or consumers.

I used a similar approach to code a reader/writer lock.

You could check it out: https://github.com/prathammalik/OS161/blob/master/kern/thread/synch.c