Incorrect local variable value after pthread_join

237 views Asked by At

I am trying to sum up 1000 elements integer array(where each element is 1) with pthread library by splitting the array in to segments of size 10. So effectively, 100 threads are being used to do that. The results of this parallel operation is as expected (1000). But interestingly, the sequential sum which I calculated before creating the threads is being set to zero after my first call to pthread_join(). Not sure if I am missing something here. Can someone spot the bug here?

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define SEGMENT_SIZE 10
#define NUM_THREADS 100
int *array = NULL;

void* segment_sum(void *args)
{
        int index = (int)args;
        int sum = 0;

        for (int i = index * SEGMENT_SIZE; i < (index + 1) * SEGMENT_SIZE; i++) {
                sum += array[i];
        }

        return (void *)sum;
}

int main()
{
        pthread_t thread[NUM_THREADS];
        int res = 0;
        int seq_res = 0;
        int par_res = 0;

        array = calloc(1, sizeof(int) * NUM_THREADS * SEGMENT_SIZE);
        for (int i = 0; i < NUM_THREADS * SEGMENT_SIZE; i++) {
                array[i] = 1;
                seq_res += 1;
        }


        for (int i = 0; i < NUM_THREADS; i++) {
                res = pthread_create(&thread[i], NULL, segment_sum, (void *)i);
                if (res != 0) {
                        printf("\nError creating new thread");
                }
        }

        printf("\nindex = %d", seq_res);                // the sequential sum here is 1000

        for (int i = 0; i < NUM_THREADS; i++) {
                int sum = 0;
                res = pthread_join(thread[i], (void **)&sum);
                if (res != 0) {
                        printf("\nError creating new thread");
                }
                printf("\nindex = %d", seq_res);        // Here it is becoming zero!!!

                par_res += sum;
        }

        printf("\nmultithreaded sum: %d single threaded sum: %d\n", par_res, seq_res);
}
1

There are 1 answers

4
Rachid K. On BEST ANSWER

When you compile your program, try as much as possible to eliminate the warnings as they often point out non portable behaviors or hidden errors. Here the compilation points out the following:

pte.c: In function 'segment_sum':
pte.c:11:21: warning: cast from pointer to integer of different size [-Wpointer-to-int-cast]
   11 |         int index = (int)args;
      |                     ^
pte.c:18:16: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
   18 |         return (void *)sum;
      |                ^
pte.c: In function 'main':
pte.c:36:69: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
   36 |                 res = pthread_create(&thread[i], NULL, segment_sum, (void *)i);
      |                                                                     ^

The parameter passed to the threads is a cast of a pointer into an "int". It is advised to pass the address of an "int". Hence, you can define a per-thread context:

struct thd_ctx {
  pthread_t thread;
  int index;
  int sum;
};

pthread_join() is passed the address of a pointer which will get the address of the memory location into which the thread stored its result. The thread must return the address of this memory location, not the value stored into it. Moreover, the thread should not return the address of an automatic variable (i.e. in its stack) as it is unspecified. The result must be the address of a global variable (or "something" visible from the joining thread) returned either directly or through pthread_exit(). In this enhancement of the program, we use the address of the "sum" field in the thread's context:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>

#define SEGMENT_SIZE 10
#define NUM_THREADS 100
int *array = NULL;


struct thd_ctx {
  pthread_t thread;
  int index;
  int sum;
};


void *segment_sum(void *args)
{
  int i;
  struct thd_ctx *ctx = (struct thd_ctx *)args;

  ctx->sum = 0;

  for (i = ctx->index * SEGMENT_SIZE; i < (ctx->index + 1) * SEGMENT_SIZE; i++) {
    ctx->sum += array[i];
  }

  return (void *)&(ctx->sum);
}


int main(void)
{
  struct thd_ctx thd_ctx[NUM_THREADS];
  int res = 0;
  int seq_res = 0;
  int par_res = 0;
  int i;

  array = calloc(1, sizeof(int) * NUM_THREADS * SEGMENT_SIZE);
  if (!array) {
    fprintf(stderr, "calloc(): error %d\n", errno);
    return 1;
  }

  for (i = 0; i < NUM_THREADS * SEGMENT_SIZE; i++) {
    array[i] = 1;
    seq_res += 1;
  }

  for (i = 0; i < NUM_THREADS; i++) {
    thd_ctx[i].index = i;
    res = pthread_create(&(thd_ctx[i].thread), NULL, segment_sum, (void *)&(thd_ctx[i]));
    if (res != 0) {
      fprintf(stderr, "Error %d creating new thread#%d\n", res, i);
      free(array);
      return 1;
    }
  }

  printf("Index = %d\n", seq_res);   // the sequential sum here is 1000

  for (i = 0; i < NUM_THREADS; i++) {
    int *sum = 0;
    res = pthread_join(thd_ctx[i].thread, (void **)&(sum));
    if (res != 0) {
      printf("Error %d joining thread#%d", res, i);
      free(array);
      return 1;
    }

    par_res += *sum;

    printf("sum = %d\n", par_res);
  }

  printf("\nMultithreaded sum: %d single threaded sum: %d\n", par_res, seq_res);

  free(array);

  return 0;
}