Domanda

Sto scrivendo una matrice sparsa risolutore con il metodo di Gauss-Seidel. Con profilatura, ho deciso che circa la metà del tempo di mio programma è speso all'interno del risolutore. La parte prestazioni critiche è la seguente:

size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
for (size_t y = 1; y < d_ny - 1; ++y) {
    for (size_t x = 1; x < d_nx - 1; ++x) {
        d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
        ++ic; ++iw; ++ie; ++is; ++in;
    }
    ic += 2; iw += 2; ie += 2; is += 2; in += 2;
}

Tutti gli array coinvolti sono di tipo float. In realtà, essi non sono array ma oggetti con un operatore [] sovraccarico, che (credo) dovrebbe essere ottimizzato distanza, ma è definita come segue:

inline float &operator[](size_t i) { return d_cells[i]; }
inline float const &operator[](size_t i) const { return d_cells[i]; }

Per d_nx = d_ny = 128, questo può essere eseguito circa 3500 volte al secondo su un processore Intel i7 920. Questo significa che il corpo del ciclo interno viene eseguito 3500 * 128 * 128 = 57 milioni di volte al secondo. Dal momento che solo un po 'di semplice aritmetica è coinvolto, che mi sembra un numero basso per un processore 2,66 GHz.

Forse non è limitato dalla potenza della CPU, ma la larghezza di banda della memoria? Ebbene, una 128 * 128 float matrice mangia 65 kB, quindi tutti i 6 matrici dovrebbero facilmente inseriscono nella cache L3 della CPU (che è 8 MB). Supponendo che nulla viene memorizzata nella cache in registri, conto 15 accessi di memoria nel corpo ciclo interno. In un sistema a 64 bit è 120 byte per ogni iterazione, quindi 57 milioni * 120 byte = 6.8 Gb / s. La cache L3 corre a 2,66 GHz, quindi è lo stesso ordine di grandezza. La mia ipotesi è che la memoria è davvero il collo di bottiglia.

Per accelerare l'operazione, ho tentato il seguente:

  • Compila con g++ -O3. (Beh, avevo fatto questo fin dall'inizio.)

  • parallelizzazione più di 4 core utilizzando pragma OpenMP. Devo cambiare l'algoritmo Jacobi per evitare di legge e scrive allo stesso array. Ciò richiede che faccio il doppio delle iterazioni, portando ad un risultato netto di circa la stessa velocità.

  • Fiddling con dettagli di implementazione del corpo del ciclo, ad esempio utilizzando i puntatori al posto degli indici. Nessun effetto.

Qual è l'approccio migliore per accelerare questo ragazzo in su? Sarebbe utile per riscrivere il corpo interno in assemblea (avrei dovuto imparare che prima)? Devo eseguire questo sulla GPU, invece (che so come fare, ma è una seccatura)? Tutte le altre idee brillanti?

(NB I non prendere "no" come risposta, come in: "non può essere fatto molto più veloce, perché ...")

Aggiorna : come richiesto, ecco un programma completo:

#include <iostream>
#include <cstdlib>
#include <cstring>

using namespace std;

size_t d_nx = 128, d_ny = 128;
float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n;

void step() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}

void solve(size_t iters) {
    for (size_t i = 0; i < iters; ++i) {
        step();
    }
}

void clear(float *a) {
    memset(a, 0, d_nx * d_ny * sizeof(float));
}

int main(int argc, char **argv) {
    size_t n = d_nx * d_ny;
    d_x = new float[n]; clear(d_x);
    d_b = new float[n]; clear(d_b);
    d_w = new float[n]; clear(d_w);
    d_e = new float[n]; clear(d_e);
    d_s = new float[n]; clear(d_s);
    d_n = new float[n]; clear(d_n);
    solve(atoi(argv[1]));
    cout << d_x[0] << endl; // prevent the thing from being optimized away
}

I compilare ed eseguire come segue:

$ g++ -o gstest -O3 gstest.cpp
$ time ./gstest 8000
0

real    0m1.052s
user    0m1.050s
sys     0m0.010s

(Fa 8000 invece di 3500 iterazioni al secondo perché il mio programma "reale" fa un sacco di altre cose troppo. Ma è rappresentativo.)

Aggiorna 2 : Mi è stato detto che i valori unititialized potrebbero non essere rappresentativi perché i valori Nan e Inf possono rallentare le cose. Ora cancellazione della memoria nel codice di esempio. Non fa differenza per me in velocità di esecuzione, però.

È stato utile?

Soluzione

Un paio di idee:

  1. Usa SIMD. Si potrebbe caricare 4 carri alla volta da ciascun array in un registro SIMD (SSE per esempio su Intel, VMX su PowerPC). Lo svantaggio di questo è che alcuni dei valori d_x sarà "stantio" in modo che il tasso di convergenza soffrirà (ma non così male come un'iterazione Jacobi); è difficile dire se gli offset SpeedUp esso.

  2. SOR . E 'semplice, non aggiunge molto di calcolo, e può migliorare la vostra velocità di convergenza abbastanza bene, anche per un valore di rilassamento relativamente conservatore (diciamo 1,5).

  3. Usa gradiente coniugato. Se questo è per la fase proiezione di una simulazione fluido (cioè enforcing non comprimibilità), si dovrebbe essere in grado di applicare CG e ottenere un migliore tasso convergenza molto. Un buon precondizionatore aiuta ancora di più.

  4. Usa un risolutore specializzata. Se il sistema lineare nasce dalla Poisson equazione , si può fare ancora meglio gradiente coniugato con una FFT metodi basati.

Se si può spiegare di più su ciò che il sistema si sta cercando di risolvere sembra, io probabilmente può dare qualche consiglio su 3 # e # 4.

Altri suggerimenti

Credo che sono riuscito a ottimizzarlo, ecco un codice, creare un nuovo progetto di VC ++, aggiungere questo codice e semplicemente compilare sotto "Release".

#include <iostream>
#include <cstdlib>
#include <cstring>

#define _WIN32_WINNT 0x0400
#define WIN32_LEAN_AND_MEAN
#include <windows.h>

#include <conio.h>

using namespace std;

size_t d_nx = 128, d_ny = 128;
float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n;

void step_original() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}
void step_new() {
    //size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    float
        *d_b_ic,
        *d_w_ic,
        *d_e_ic,
        *d_x_ic,
        *d_x_iw,
        *d_x_ie,
        *d_x_is,
        *d_x_in,
        *d_n_ic,
        *d_s_ic;

    d_b_ic = d_b;
    d_w_ic = d_w;
    d_e_ic = d_e;
    d_x_ic = d_x;
    d_x_iw = d_x;
    d_x_ie = d_x;
    d_x_is = d_x;
    d_x_in = d_x;
    d_n_ic = d_n;
    d_s_ic = d_s;

    for (size_t y = 1; y < d_ny - 1; ++y)
    {
        for (size_t x = 1; x < d_nx - 1; ++x)
        {
            /*d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];*/
            *d_x_ic = *d_b_ic
                - *d_w_ic * *d_x_iw - *d_e_ic * *d_x_ie
                - *d_s_ic * *d_x_is - *d_n_ic * *d_x_in;
            //++ic; ++iw; ++ie; ++is; ++in;
            d_b_ic++;
            d_w_ic++;
            d_e_ic++;
            d_x_ic++;
            d_x_iw++;
            d_x_ie++;
            d_x_is++;
            d_x_in++;
            d_n_ic++;
            d_s_ic++;
        }
        //ic += 2; iw += 2; ie += 2; is += 2; in += 2;
        d_b_ic += 2;
        d_w_ic += 2;
        d_e_ic += 2;
        d_x_ic += 2;
        d_x_iw += 2;
        d_x_ie += 2;
        d_x_is += 2;
        d_x_in += 2;
        d_n_ic += 2;
        d_s_ic += 2;
    }
}

void solve_original(size_t iters) {
    for (size_t i = 0; i < iters; ++i) {
        step_original();
    }
}
void solve_new(size_t iters) {
    for (size_t i = 0; i < iters; ++i) {
        step_new();
    }
}

void clear(float *a) {
    memset(a, 0, d_nx * d_ny * sizeof(float));
}

int main(int argc, char **argv) {
    size_t n = d_nx * d_ny;
    d_x = new float[n]; clear(d_x);
    d_b = new float[n]; clear(d_b);
    d_w = new float[n]; clear(d_w);
    d_e = new float[n]; clear(d_e);
    d_s = new float[n]; clear(d_s);
    d_n = new float[n]; clear(d_n);

    if(argc < 3)
        printf("app.exe (x)iters (o/n)algo\n");

    bool bOriginalStep = (argv[2][0] == 'o');
    size_t iters = atoi(argv[1]);

    /*printf("Press any key to start!");
    _getch();
    printf(" Running speed test..\n");*/

    __int64 freq, start, end, diff;
    if(!::QueryPerformanceFrequency((LARGE_INTEGER*)&freq))
        throw "Not supported!";
    freq /= 1000000; // microseconds!
    {
        ::QueryPerformanceCounter((LARGE_INTEGER*)&start);
        if(bOriginalStep)
            solve_original(iters);
        else
            solve_new(iters);
        ::QueryPerformanceCounter((LARGE_INTEGER*)&end);
        diff = (end - start) / freq;
    }
    printf("Speed (%s)\t\t: %u\n", (bOriginalStep ? "original" : "new"), diff);
    //_getch();


    //cout << d_x[0] << endl; // prevent the thing from being optimized away
}

Esegui in questo modo:

app.exe 10000 o

app.exe 10000 n

"o" mezzo vecchio codice, la tua.

"n" è mio, quello nuovo.

I miei risultati: Velocità (originale):

1515028

1523171

1495988

Velocità (nuovo):

966.012

984.110

1006045

Il miglioramento di circa il 30%.

La logica dietro: Hai utilizzato i contatori indice per l'accesso / manipolare. Io uso i puntatori. Durante l'esecuzione, breakpoint in una certa linea codice di calcolo in debugger di VC ++ s ', e premere F8. Otterrete la finestra disassembler. La vedrete i codici operativi prodotte (assemblaggio di codice).

In ogni caso, aspetto:

int * x = ...;

x [3] = 123;

Questo dice al PC per inserire il puntatore x a un registro (dire EAX). L'add esso (3 sizeof * (int)). Solo allora, impostare il valore a 123.

L'approccio puntatori è molto meglio, come si può capire, perché abbiamo tagliato il processo di aggiunta, in realtà gestiamo noi stessi, quindi in grado di ottimizzare in base alle esigenze.

Spero che questo aiuta.

Nota a margine per di stackoverflow.com personale: Grande sito, spero che ho sentito parlare molto tempo fa!

Per prima cosa, sembra che ci sia un problema di pipelining qui. Il ciclo legge dal valore in d_x che è stato appena scritto, ma a quanto pare deve attendere che in scrittura al completo. Basta riorganizzare l'ordine del calcolo, fare qualcosa di utile mentre si sta aspettando, lo rende quasi due volte più veloce:

d_x[ic] = d_b[ic]
                        - d_e[ic] * d_x[ie]
    - d_s[ic] * d_x[is] - d_n[ic] * d_x[in]
    - d_w[ic] * d_x[iw] /* d_x[iw] has just been written to, process this last */;

E 'stato Eamon Nerbonne che capito questo. Molti upvotes a lui! Non avrei mai immaginato.

di Poni aspetto risposta come quella giusta per me.

voglio solo far notare che in questo tipo di problema, è spesso benefici guadagnare dalla località di memoria. In questo momento, gli array b,w,e,s,n sono tutti in luoghi separati in memoria. Se si potesse non montare il problema nella cache L3 (per lo più in L2), allora questo sarebbe male, e una soluzione di questo tipo sarebbe utile:

size_t d_nx = 128, d_ny = 128;
float *d_x;

struct D { float b,w,e,s,n; };
D *d;

void step() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d[ic].b
                - d[ic].w * d_x[iw] - d[ic].e * d_x[ie]
                - d[ic].s * d_x[is] - d[ic].n * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}
void solve(size_t iters) { for (size_t i = 0; i < iters; ++i) step(); }
void clear(float *a) { memset(a, 0, d_nx * d_ny * sizeof(float)); }

int main(int argc, char **argv) {
    size_t n = d_nx * d_ny;
    d_x = new float[n]; clear(d_x);
    d = new D[n]; memset(d,0,n * sizeof(D));
    solve(atoi(argv[1]));
    cout << d_x[0] << endl; // prevent the thing from being optimized away
}

Ad esempio, questa soluzione a 1280x1280 è un po 'meno di 2x più veloce di soluzione di Poni (13s vs 23s nella mia prova - l'implementazione originale è quindi 22s), mentre a 128x128 E' il 30% più lento . (7s contro 10s - l'originale è 10s)

(iterazioni sono stati scalato fino a 80000 per il caso base, e 800 per il 100x più grande caso di 1280x1280.)

ti penso abbia ragione sulla memoria di essere un collo di bottiglia. Si tratta di un ciclo molto semplice con solo alcune semplici operazioni aritmetiche per ogni iterazione. ic, IW, cioè, è, in indici sembrano essere su lati opposti della matrice quindi immagino che c'è un gruppo di cache miss lì.

Non sono un esperto in materia, ma ho visto che ci sono diverse pubblicazioni accademiche sul miglioramento della utilizzo della cache del metodo di Gauss-Seidel.

Un altro possibile ottimizzazione è l'uso della variante rosso-nero, in cui i punti vengono aggiornati in due spazza in una scacchiera-come il modello. In questo modo, tutti gli aggiornamenti in uno sweep sono indipendenti e possono essere parallelizzati.

Suggerisco di mettere in alcune dichiarazioni di prefetch e anche la ricerca di "dati di progettazione oriented":

void step_original() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    float dw_ic, dx_ic, db_ic, de_ic, dn_ic, ds_ic;
    float dx_iw, dx_is, dx_ie, dx_in, de_ic, db_ic;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
// Perform the prefetch
// Sorting these statements by array may increase speed;
//    although sorting by index name may increase speed too.
            db_ic = d_b[ic];
            dw_ic = d_w[ic];
            dx_iw = d_x[iw];
            de_ic = d_e[ic];
            dx_ie = d_x[ie];
            ds_ic = d_s[ic];
            dx_is = d_x[is];
            dn_ic = d_n[ic];
            dx_in = d_x[in];
// Calculate
            d_x[ic] = db_ic
                - dw_ic * dx_iw - de_ic * dx_ie
                - ds_ic * dx_is - dn_ic * dx_in;
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}

Questo è diverso dal vostro secondo metodo in quanto i valori sono copiati variabili temporanee locali prima che venga eseguito il calcolo.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top