I'm trying to develop the classic producer-consumer circular buffer example with one producer and one consumer respectively.
However I'm having some problems that I can't figure out where they come from. In particular the producer seems to work but the consumer always picks up a 0. I was thinking about some problem with shared memory and/or buffering but they seem to be implemented correctly
Code explanation:
-Initially I create the shared memory area of the size of the STRUCT_BUFFER defined above. It is actually only used to provide the size at this point.
-I create 3 semaphores: mutex, full, empty.
-fork(): in the child (consumer) I call a get of the data on the buffer and if the number found is -1 (termination token) it puts it back and exits the loop, terminating. In the parent (producer) I execute a for which puts each index on the buffer respectively and at the end puts a -1 (termination token).
-Then wait until the consumers are finished and eliminate the traffic lights. The get and put functions are the classic codes of producer-consumers.
ps. SYSC() macro is only used for error management.
#include <sys/wait.h>
#include <sys/mman.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <time.h>
#include <semaphore.h>
#define SHM_NAME "/shm8"
#define MUTEX "/mutex"
#define FULL "/full"
#define EMPTY "/empty"
#define N 10
#define SYSC(value, command, message) if ((value=command) == -1){perror(message); exit(errno);}
#define SYSCN(value, command, message) if ((value=command) == NULL){perror(message); exit(errno);}
typedef struct{
int B[N];
int *head;
int *tail;
} STRUCT_BUFFER;
void put(int, int*, int*, int*, sem_t*, sem_t*, sem_t*);
int get(int*, int*, int*, sem_t*, sem_t*, sem_t*);
int main(){
int ret_value, mem_fd;
void * ptr;
pid_t pid;
sem_t * mutex;
sem_t * full;
sem_t * empty;
int * head;
int * tail;
// Creating shared memory area
SYSC(mem_fd, shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666), "in shm_open");
SYSC(ret_value, ftruncate(mem_fd, sizeof(STRUCT_BUFFER)), "in ftruncate");
// Memory mapping
SYSCN(ptr, mmap(NULL, sizeof(STRUCT_BUFFER), PROT_READ | PROT_WRITE, MAP_SHARED, mem_fd, 0), "in mmap");
head = tail = ptr;
// Closing file descriptor
SYSC(ret_value, close(mem_fd), "in close");
// Creating semaphores
mutex = sem_open(MUTEX, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, 1);
if(mutex == SEM_FAILED) {
perror("In sem_open");
exit(EXIT_FAILURE);
}
full = sem_open(FULL, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, N);
if(full == SEM_FAILED) {
perror("In sem_open");
exit(EXIT_FAILURE);
}
empty = sem_open(EMPTY, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, 0);
if(empty == SEM_FAILED) {
perror("In sem_open");
exit(EXIT_FAILURE);
}
// Creating a new process
SYSC(pid, fork(), "in fork");
if(!pid) {
// Consumer code
printf("Consumer %d starts\n", getpid());
int toGet;
while(1) {
toGet = get(ptr, head, tail, mutex, full, empty);
printf("Consumer %d consumed %d\n", getpid(), toGet);
// If termination token
if(toGet == -1) {
put(toGet, ptr, head, tail, mutex, full, empty);
break;
}
}
sleep(1);
printf("Consumer %d finishes\n",getpid());
exit(EXIT_SUCCESS);
} else {
// Producer code
printf("Producer %d starts\n",getpid());
int toPut;
int exitToken = -1;
// Production
for(int i = 0 ; i < N ; i++) {
toPut = i + 1;
put(toPut, ptr, head, tail, mutex, full, empty);
printf("Producer %d produced %d. Cycle:%d\n", getpid(), toPut, i);
sleep(1);
}
// Termination token
put(exitToken, ptr, head, tail, mutex, full, empty);
printf("Producer %d produced termination token\n", getpid());
sleep(1);
// Wait
SYSC(ret_value, wait(NULL), "in wait");
// Closing semaphores
sem_close(mutex);
sem_close(empty);
sem_close(full);
sem_unlink(MUTEX);
sem_unlink(FULL);
sem_unlink(EMPTY);
// Closing shared memory
munmap(ptr, sizeof(STRUCT_BUFFER));
printf("Producer %d finishes\n",getpid());
exit(EXIT_SUCCESS);
}
}
void put(int toPut, int * ptr, int * head, int * tail, sem_t * mutex, sem_t * full, sem_t * empty) {
sem_wait(full);
sem_wait(mutex);
// Critical section
ptr[*tail] = toPut;
*tail = ((*tail) + 1) % N;
sem_post(mutex);
sem_post(empty);
}
int get(int * ptr, int * head, int * tail, sem_t * mutex, sem_t * full, sem_t * empty) {
sem_wait(empty);
sem_wait(mutex);
// Critical section
int toGet;
toGet = ptr[*head];
*head = ((*head) + 1) % N;
sem_post(mutex);
sem_post(full);
return toGet;
}
Having inferred suitable definitions for your
SYSCandSYSCNmacros, I was able to build your program and confirm your observed misbehavior.The primary error is to do with the
headandtailpointers. When you declare them as pointers inmainand initialize them like so:, you are treating them as if they are pointers to the head and tail elements of the buffer themselves. It would be ok in that case for the head and tail pointers to initially alias each other and the base pointer.
In your
get()andput()functions, however, you are treatingheadandtailas pointers to variables storing the indices of the head and tail elements. For example:For that use, it is not ok for those pointers to alias each other or the data. They need then to point to their own, separate objects.
As a secondary matter, the
headandtailindices (ints, not pointers) need to be shared between the processes, because the consumer process accesses both. In the original code, theheadandtailpointers used are both local variables ofmain, and so not shared between processes (each process has its own, independent copies of these). It looks like you might have planned to share the pointers via the sharedSTRUCT_BUFFERobject, but ended up not doing so. This would be an appropriate place to put the head and tail indices that you need to share.Additionally, I would recommend
using an anonymous memory mapping instead of creating and mapping a POSIX shared memory segment. Aside from being simpler, that will eliminate issues that could arise from the shared memory segment living longer than you intended / needed or being shared by distinct instances of your program running concurrently.
using unnamed (process-shared) semaphores stored in the shared memory region instead of named semaphores. This is slightly simpler, and eliminates issues that could arise from your semaphores living longer than you intended or being shared by distinct instances of your program running concurrently. These object would need to be stored in shared memory, but you already have a setup for that. Just add these to
STRUCT_BUFFER.having implemented all of the above, your
STRUCT_BUFFERobject will contain all the information theget()andput()functions need to work with, other than the particular value thatput()should store. I strongly recommend modifying these functions to accept a pointer to theSTRUCT_BUFFERinstead of all those individual arguments.