I wrote a producer/consumer program using mutex
and condition
.
It use a global int
to produce & consume values on.
There are 1 consumer thread, and multiple producer threads.
Rules:
When the value is too small, then consumer will wait.
When the value is too big, then producers will wait.
My question is:
We know consumer normally need to wait, but producers depends.
In my example they both need to check condition, and might wait each other, is this a good practice?
Would it possible to cause dead lock in my following implementation?
Code:
// condition test, a producer/consumer program,
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
static int glob = 0; // global variable, shared by threads,
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
/**
* increase once, with lock, it's producer,
*
* @param arg
* {max, point}
* where:
* max, max value that would increase to,
* point, is min value that would trigger consume,
*
* @return
* 0, not changed,
* >0, increased,
* <0, error,
*/
static void *inc(void *arg) {
int *args = (int *)arg;
int max = args[0];
int point = args[1];
int result;
int n = 0;
if((result = pthread_mutex_lock(&mtx)) != 0) { // lock
printf("error to get lock: %d\n", result);
pthread_exit(NULL); // terminate if error,
} else {
while(glob >= max) {
if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait
printf("failed to wait for condition: %d\n", result);
return (void *)-1;
}
}
// do jobs,
glob++; // this will be compiled into multiple lines in machine code, so it's not automic,
n = 1;
/*
printf("inc by 1, %d\n", glob);
fflush(stdout);
*/
if(glob >= point) { // condition signal
if((result = pthread_cond_signal(&cond)) !=0 ) {
printf("error to condition signal: %d\n", result);
return (void *)-1;
} else {
// printf("condition signal, from thread [%d], value: %d\n", (int)pthread_self(), glob);
}
}
if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock
printf("error to unlock: %d\n", result);
return (void *)-1;
}
}
return (void *)n;
}
// increase loop,
static void *inc_loop(void *arg) {
int result;
while(1) {
if((result = (int)inc(arg)) < 0) {
return (void *)result;
}
}
}
/**
* consumer, with lock,
*
* @param arg
* {point, steps}
* where:
* point, is min value that would trigger consume,
* steps, is the count each consume would take,
*
* @return
* 0, not consumed,
* >0, consumed,
* <0, error,
*/
static void *consume(void *arg) {
int *args = (int *)arg;
int point = args[0];
int step = args[1];
int result;
int n = 0;
if((result = pthread_mutex_lock(&mtx)) != 0) { // lock
printf("error to get lock: %d\n", result);
pthread_exit(NULL); // terminate if error,
} else {
while(glob < point) {
pthread_cond_broadcast(&cond); // broadcast
printf("broadcast, and sleep,\n");
if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait
printf("failed to wait for condition: %d\n", result);
return (void *)-1;
}
}
// do job
printf("going to perform consume: %d -> ", glob);
glob-=(glob>=step?step:glob);
printf("%d\n", glob);
n = 1;
if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock
printf("error to unlock: %d\n", result);
}
}
return (void *)n;
}
// condition test
int condition_test(void *(*func_inc_loop) (void *), void *(*func_consume) (void *), int thread_count, int max, int point, int consume_count, int step) {
pthread_t threads[thread_count];
int result, i;
int inc_args[] = {
max, // max value that would increase to,
point // min value that would trigger consume,
};
// start threads
for(i=0; i<thread_count; i++) {
if((result = pthread_create(threads+i, NULL, func_inc_loop, inc_args)) != 0) {
printf("error create thread [%d]: %d\n", i, result);
}
}
int loops = 0;
int consume_args[] = {
point, // min point to trigger consume,
step // consume steps
};
// begin consume loop,
while(loops < consume_count) {
if(func_consume(consume_args) > 0) {
loops++;
}
}
printf("\nDone.\n");
return 0;
}
/**
* command line:
* ./a.out <[thread_count]> <[max]> <[point]> <[consume_count]>
*/
int main(int argc, char *argv[]) {
int thread_count = 3;
int max = 1000;
int point = 100;
int consume_count = 10; // how many times consume execute,
int step = 200; // max count in each consume,
if(argc >= 2) {
thread_count = atoi(argv[1]);
}
if(argc >= 3) {
max = atoi(argv[2]);
}
if(argc >= 4) {
point = atoi(argv[3]);
}
if(argc >= 5) {
consume_count = atoi(argv[4]);
}
if(argc >= 6) {
step = atoi(argv[5]);
}
condition_test(&inc_loop, &consume, thread_count, max, point, consume_count, step);
return 0;
}
Compile:
gcc -pthread xxx.c
Execute:
./a.out
In practice, you should not use mutex to solve a producer/consumer or reader-writer problem. It necessarily does not give rise to deadlocks but might lead to starvation of either producers or consumers.
I used a similar approach to code a reader/writer lock.
You could check it out: https://github.com/prathammalik/OS161/blob/master/kern/thread/synch.c