2015-07-05 20 views
7

Uczę się przetwarzania równoległego za pomocą Pthreadów. Mam czterordzeniowy procesor. Niestety, zrównoleglona część poniższego kodu działa z grubsza 5 razy wolniej niż nierównoległy kod. Co ja tu robię źle? Z góry dziękuję za pomoc.POSIX Wątki nie przyspieszające w C

#include <stdio.h> 
#include <time.h> 
#include <pthread.h> 
#include <stdlib.h> 
#define NTHREADS 4 
#define SIZE NTHREADS*10000000 

struct params { 
    int * arr; 
    int sum; 
}; 

/* The worker function for the pthreads */ 
void * myFun (void * x){ 
    int i; 
    struct params * b = (struct params *) x; 
    for (i = 0; i < (int)(SIZE/NTHREADS); ++i){ 
    b->sum += b->arr[i]; 
    } 
    return NULL; 
} 

/* unparallelized summing function*/ 
int arrSum(int * arr, int size){ 
    int sum = 0; 
    for (int i = 0; i != size; ++i){ 
    sum += arr[i]; 
    } 
    return sum; 
} 

int main(int argc, char * argv[]){ 
    clock_t begin, end; 
    double runTime; 
    int rc, i; 
    int sum1, sum2 = 0; 
    pthread_t threads[NTHREADS]; 

    /* create array to sum over */ 
    int * myArr = NULL; 
    myArr = (int *) calloc(SIZE, sizeof(int)); 
    if (myArr == NULL){ 
    printf("problem allocating memory\n"); 
    return 1; 
    } 
    for (int i = 0; i < SIZE; ++i){ 
    myArr[i] = 1; 
    } 

    /* create array of params structs to feed to threads */ 
    struct params p; 
    p.sum = 0; 
    struct params inputs[NTHREADS]; 
    for(i = 0; i != NTHREADS; ++i){ 
    p.arr = myArr + i*(int)(SIZE/NTHREADS); 
    inputs[i] = p; 
    } 

    /* spawn the threads */ 
    begin = clock(); 
    for(i = 0; i != NTHREADS; ++i){ 
    rc = pthread_create(&threads[i], NULL, myFun, (void *) &inputs[i]); 
    } 

    /* wait for threads to finish */ 
    for(i = 0; i != NTHREADS; ++i){ 
    rc = pthread_join(threads[i], NULL); 
    } 
    end = clock(); 
    runTime = (double)(end - begin)/CLOCKS_PER_SEC; 
    printf("Parallelized code run time: %f\n", runTime); 

    /* run the unparallelized code */ 
    begin = clock(); 
    sum2 = arrSum(myArr, SIZE); 
    end = clock(); 
    runTime = (double)(end - begin)/CLOCKS_PER_SEC; 
    printf("Unparallelized code run time: %f\n", runTime); 

    /* consolidate and print results from threads */ 
    for(i = 0; i != NTHREADS; ++i){ 
    sum1 += inputs[i].sum; 
    } 
    printf("sum1, sum2: %d, %d \n", sum1, sum2); 

    free(myArr); 

    /* be disappointed when my parallelized code showed no speedup */ 
    return 1; 
} 
+1

Czy istnieje jakikolwiek powód, aby dodać znacznik C++? – Olaf

+0

@Jeśli kod jest prawidłowy zarówno w C++, jak i C. Być może OP nie obchodzi, czy odpowiedź będzie C lub C++. –

+1

@ Hi-Angel: Występują pewne różnice między tymi dwoma językami. Na przykład w C nie powinieneś używać 'void *', podczas gdy w C++ musisz. Ogólnie nie powinieneś pisać kodu takiego jak ten w C++, więc zakładam, że jest to C. – Olaf

Odpowiedz

2

Brakuje jednego ważnego aspektu programowania równoległego.

Wątki robocze należy tworzyć raz dla każdego procesu, a nie dla każdego zadania.

Tworzenie i niszczenie wątków wymaga czasu.

Rozwiązaniem jest użycie puli wątków i wysłanie zadań do puli.

Moja sugestia to użycie OpenMP, która znacznie upraszcza to zadanie i współpracuje z wieloma kompilatorami.

przykład:

int sum = 0 
#pragma omp for shared(sum) 
for(int i=0; i<SIZE; ++i) 
{ 
    #pragma omp atomic 
    sum += myArr[i] 
} 

do tej pracy szybciej zrobić odwijanie pętli - np obliczyć sumę 8 liczb w pojedynczym zakresie pętli for.

+0

dzięki za odpowiedź. Jednak nie jestem pewien, co masz na myśli mówiąc "wątki muszą być tworzone raz na proces, a nie raz dla każdego zadania". Rozumiem, że aby mieć szansę na posiadanie wszystkich czterech rdzeni jednocześnie pracując nad zsumowaniem części tablicy, musiałbym mieć 4 wątki uruchomione. Czy mógłbyś rozwinąć trochę więcej? Dzięki. – Thirdeye

+1

Utwórz 4 wątki raz na początku programu. Tworzenie wątków wymaga czasu. Utworzone wątki czekają na zadanie (np. Przez muteks). Po otrzymaniu zadania wątki oczekują na kolejne zadanie. Nazywa się to pulą wątków. To, co robisz, to pomiar czasu, który zajmuje wytworzenie ORAZ zniszczenie wątków, który jest stały czas niezależnie od obciążenia. Jeśli potrzebujesz więcej zadań, powinieneś ponownie użyć istniejących wątków. – egur

+0

Teraz rozumiem. Dzięki. – Thirdeye

2

Głównym problemem jest to, że używasz clock() which does not return the wall time but the cumulative CPU time. Jest to najczęstszy błąd związany z tagiem OpenMP z SO (i jeśli lista częstotliwości była użyteczna na SO, powinna to pokazać).

Najprostszym sposobem uzyskania czasu na ścianie jest użycie funkcji OpenMP: omp_get_wtime(). Działa to na Linuksie i Windowsie z GCC, ICC i MSVC (i zakładam Clang, który obsługuje teraz OpenMP 3.1).

Kiedy używam tego z kodem I mi na cztery rdzenia/wątku ośmiu hiper-systemu i7 IVB:

Parallelized code run time: 0.048492 
Unparallelized code run time: 0.115124 
sum1, sum2: 400000000, 400000000 

niektórych innych uwag. Twoje planowanie jest podatne na błędy. Ustawić tablicę dla każdego wątku do

p.arr = myArr + i*(int)(SIZE/NTHREADS); 

I wtedy każdy bieg gwintu nad (SIZE/NTHREADS). Może to dawać błędne wyniki błędom zaokrąglania dla niektórych wartości SIZE i NTHREADS.

powinien mieć każdy bieg gwintu nad

int start = ithread*SIZE/NTHREADS; 
int finish = (ithreads+1)*SIZE/NTHREADS; 

I wtedy każdy punkt gwintu do początku tablicy i zrobić

int sum = 0; 
for (i = start; i < finish; ++i){ 
    sum += b->arr[i]; 
} 

Jest to w zasadzie co OpenMP na schedule(static) robi. W rzeczywistości można uzyskać ten sam efekt przy użyciu OpenMP pthreads wykonując

int sum = 0; 
#pragma omp parallel for reduction(+:sum) 
for (int i = 0; i < size; ++i){ 
    sum += arr[i]; 
} 

Oto kod użyłem

//gcc -O3 -std=gnu99 t.c -lpthread -fopenmp 
#include <stdio.h> 
#include <time.h> 
#include <pthread.h> 
#include <stdlib.h> 
#include <omp.h> 

#define NTHREADS 4 
#define SIZE NTHREADS*100000000 

struct params { 
    int * arr; 
    int sum; 
}; 

/* The worker function for the pthreads */ 
void * myFun (void * x){ 
    int i; 
    struct params * b = (struct params *) x; 
    int sum = 0; 
    for (i = 0; i < (int)(SIZE/NTHREADS); ++i){ 
    sum += b->arr[i]; 
    } 
    b->sum = sum; 
    return NULL; 
} 

/* unparallelized summing function*/ 
int arrSum(int * arr, int size){ 
    int sum = 0; 
    for (int i = 0; i < size; ++i){ 
    sum += arr[i]; 
    } 
    return sum; 
} 

int main(int argc, char * argv[]) { 
    double runTime; 
    int rc, i; 
    int sum1, sum2 = 0; 
    pthread_t threads[NTHREADS]; 

    /* create array to sum over */ 
    int * myArr = NULL; 
    myArr = (int *) calloc(SIZE, sizeof(int)); 
    if (myArr == NULL){ 
    printf("problem allocating memory\n"); 
    return 1; 
    } 
    for (int i = 0; i < SIZE; ++i){ 
    myArr[i] = 1; 
    } 

    /* create array of params structs to feed to threads */ 
    struct params p; 
    p.sum = 0; 
    struct params inputs[NTHREADS]; 
    for(i = 0; i < NTHREADS; ++i){ 
    p.arr = myArr + i*(int)(SIZE/NTHREADS); 
    inputs[i] = p; 
    } 

    /* spawn the threads */ 
    runTime = -omp_get_wtime(); 
    for(i = 0; i != NTHREADS; ++i){ 
    rc = pthread_create(&threads[i], NULL, myFun, (void *) &inputs[i]); 
    } 

    /* wait for threads to finish */ 
    for(i = 0; i != NTHREADS; ++i){ 
    rc = pthread_join(threads[i], NULL); 
    } 

    runTime += omp_get_wtime(); 
    printf("Parallelized code run time: %f\n", runTime); 

    /* run the unparallelized code */ 
    runTime = -omp_get_wtime(); 
    sum2 = arrSum(myArr, SIZE); 
    runTime += omp_get_wtime(); 
    printf("Unparallelized code run time: %f\n", runTime); 

    /* consolidate and print results from threads */ 
    for(i = 0; i != NTHREADS; ++i){ 
    sum1 += inputs[i].sum; 
    } 
    printf("sum1, sum2: %d, %d \n", sum1, sum2); 

    free(myArr); 

    /* be disappointed when my parallelized code showed no speedup */ 
    return 1; 
}