Domanda

Ho bisogno di un buffer circolare di dimensioni fisse (selezionabile in fase di esecuzione durante la creazione, non in fase di compilazione) che può contenere oggetti di qualsiasi tipo e deve essere molto alto prestazione. Non credo che ci saranno problemi di contesa di risorse poiché, sebbene si trovi in ??un ambiente incorporato multi-tasking, è cooperativo, quindi i compiti stessi possono gestirlo.

Il mio pensiero iniziale era quello di memorizzare una semplice struttura nel buffer che contenesse il tipo (enum / define semplice) e un puntatore vuoto al payload, ma voglio che questo sia il più veloce possibile, quindi sono aperto ai suggerimenti che comportano il bypass dell'heap.

In realtà sono felice di bypassare qualsiasi libreria standard per la velocità pura - da quello che ho visto del codice, non è fortemente ottimizzato per la CPU: sembra che abbiano appena compilato il codice C per cose come strcpy () e simili, non esiste un assembly codificato a mano.

Qualsiasi codice o idea sarebbe molto apprezzato. Le operazioni richieste sono:

  • crea un buffer con dimensioni specifiche.
  • metti alla coda.
  • prendi la testa.
  • restituisce il conteggio.
  • elimina un buffer.
È stato utile?

Soluzione

Puoi enumerare i tipi necessari al momento della codifica del buffer o devi essere in grado di aggiungere tipi in fase di esecuzione tramite chiamate dinamiche? Se il primo, allora creerei il buffer come un array allocato in heap di n-struct, in cui ogni struct è composta da due elementi: un tag enum che identifica il tipo di dati e un'unione di tutti i tipi di dati. Ciò che perdi in termini di spazio di archiviazione aggiuntivo per piccoli elementi, compensi in termini di non dover gestire l'allocazione / deallocazione e la conseguente frammentazione della memoria. Quindi devi solo tenere traccia degli indici di inizio e fine che definiscono gli elementi head e tail del buffer e assicurarti di calcolare il mod n quando incrementa / decrementa gli indici.

Altri suggerimenti

La soluzione più semplice sarebbe quella di tenere traccia delle dimensioni degli elementi e del numero di elementi, quindi creare un buffer del numero appropriato di byte:

typedef struct circular_buffer
{
    void *buffer;     // data buffer
    void *buffer_end; // end of data buffer
    size_t capacity;  // maximum number of items in the buffer
    size_t count;     // number of items in the buffer
    size_t sz;        // size of each item in the buffer
    void *head;       // pointer to head
    void *tail;       // pointer to tail
} circular_buffer;

void cb_init(circular_buffer *cb, size_t capacity, size_t sz)
{
    cb->buffer = malloc(capacity * sz);
    if(cb->buffer == NULL)
        // handle error
    cb->buffer_end = (char *)cb->buffer + capacity * sz;
    cb->capacity = capacity;
    cb->count = 0;
    cb->sz = sz;
    cb->head = cb->buffer;
    cb->tail = cb->buffer;
}

void cb_free(circular_buffer *cb)
{
    free(cb->buffer);
    // clear out other fields too, just to be safe
}

void cb_push_back(circular_buffer *cb, const void *item)
{
    if(cb->count == cb->capacity){
        // handle error
    }
    memcpy(cb->head, item, cb->sz);
    cb->head = (char*)cb->head + cb->sz;
    if(cb->head == cb->buffer_end)
        cb->head = cb->buffer;
    cb->count++;
}

void cb_pop_front(circular_buffer *cb, void *item)
{
    if(cb->count == 0){
        // handle error
    }
    memcpy(item, cb->tail, cb->sz);
    cb->tail = (char*)cb->tail + cb->sz;
    if(cb->tail == cb->buffer_end)
        cb->tail = cb->buffer;
    cb->count--;
}
// Note power of two buffer size
#define kNumPointsInMyBuffer 1024 

typedef struct _ringBuffer {
    UInt32 currentIndex;
    UInt32 sizeOfBuffer;
    double data[kNumPointsInMyBuffer];
} ringBuffer;

// Initialize the ring buffer
ringBuffer *myRingBuffer = (ringBuffer *)calloc(1, sizeof(ringBuffer));
myRingBuffer->sizeOfBuffer = kNumPointsInMyBuffer;
myRingBuffer->currentIndex = 0;

// A little function to write into the buffer
// N.B. First argument of writeIntoBuffer() just happens to have the
// same as the one calloc'ed above. It will only point to the same
// space in memory if the calloc'ed pointer is passed to
// writeIntoBuffer() as an arg when the function is called. Consider
// using another name for clarity
void writeIntoBuffer(ringBuffer *myRingBuffer, double *myData, int numsamples) {
    // -1 for our binary modulo in a moment
    int buffLen = myRingBuffer->sizeOfBuffer - 1;
    int lastWrittenSample = myRingBuffer->currentIndex;

    int idx;
    for (int i=0; i < numsamples; ++i) {
        // modulo will automagically wrap around our index
        idx = (i + lastWrittenSample) & buffLen; 
        myRingBuffer->data[idx] = myData[i];
    }

    // Update the current index of our ring buffer.
    myRingBuffer->currentIndex += numsamples;
    myRingBuffer->currentIndex &= myRingBuffer->sizeOfBuffer - 1;
}

Finché la lunghezza del buffer del tuo ring è una potenza di due, il binario incredibilmente veloce "quot" & amp; " l'operazione avvolgerà il tuo indice per te. Per la mia applicazione, sto visualizzando all'utente un segmento di audio da un ring buffer di audio acquisito da un microfono.

Mi assicuro sempre che la massima quantità di audio che può essere visualizzata sullo schermo sia molto inferiore alla dimensione del ring buffer. Altrimenti potresti leggere e scrivere dallo stesso pezzo. Questo probabilmente ti darebbe strani artefatti di visualizzazione.

Innanzitutto, il titolo. Non hai bisogno dell'aritmetica modulo per avvolgere il buffer se usi bit ints per contenere head & amp; coda "puntatori" e dimensionali in modo che siano perfettamente sincronizzati. IE: 4096 inserito in un int senza segno a 12 bit è 0 tutto da solo, non molestato in alcun modo. L'eliminazione dell'aritmetica del modulo, anche per potenze di 2, raddoppia la velocità - quasi esattamente.

10 milioni di iterazioni di riempimento e svuotamento di un buffer 4096 di qualsiasi tipo di elementi di dati impiegano 52 secondi sul mio Dell XPS 8500 i7 di terza generazione che utilizza il compilatore C ++ di Visual Studio 2010 con allineamento predefinito e 1/8192 di quello per servire un dato .

RX riscriverei i loop di test in main () in modo che non controllino più il flusso - che è, e dovrebbe essere, controllato dai valori di ritorno che indicano che il buffer è pieno o vuoto e che l'interruzione si rompe; dichiarazioni. IE: il filler e lo scolapiatti dovrebbero essere in grado di battersi l'uno contro l'altro senza corruzione o instabilità. Ad un certo punto spero di multi-thread questo codice, per cui quel comportamento sarà cruciale.

La funzione QUEUE_DESC (descrittore di coda) e l'inizializzazione forza tutti i buffer in questo codice ad essere una potenza di 2. Lo schema sopra NON funzionerà diversamente. Mentre sei sull'argomento, nota che QUEUE_DESC non è hardcoded, usa una costante manifest (#define BITS_ELE_KNT) per la sua costruzione. (Suppongo che una potenza di 2 sia sufficiente flessibilità qui)

Per rendere selezionabile il tempo di esecuzione della dimensione del buffer, ho provato diversi approcci (non mostrati qui) e ho deciso di usare USHRT per Head, Tail, EleKnt in grado di gestire un buffer FIFO [USHRT]. Per evitare l'aritmetica del modulo ho creato una maschera per & amp; & amp; con Head, Tail, ma quella maschera risulta essere (EleKnt -1), quindi basta usare quella. L'uso di USHRTS anziché bit ha migliorato le prestazioni del 15% su una macchina silenziosa. I core della CPU Intel sono sempre stati più veloci dei loro bus, quindi su una macchina occupata e condivisa, il confezionamento delle strutture dei dati ti consente di caricare ed eseguire prima di altri thread concorrenti. Trade-off.

Notare che l'archiviazione effettiva per il buffer è allocata sull'heap con calloc () e il puntatore si trova alla base della struttura, quindi la struttura e il puntatore hanno ESATTAMENTE lo stesso indirizzo. IE; non è necessario aggiungere alcun offset all'indirizzo di struttura per collegare i registri.

In questa stessa ottica, tutte le variabili associate alla manutenzione del buffer sono fisicamente adiacenti al buffer, legate nella stessa struttura, in modo che il compilatore possa creare un linguaggio di assemblaggio meraviglioso. Dovrai interrompere l'ottimizzazione in linea per vedere qualsiasi assembly, perché altrimenti viene schiacciato nell'oblio.

Per supportare il polimorfismo di qualsiasi tipo di dati, ho usato memcpy () invece di assegnazioni. Se hai bisogno solo della flessibilità per supportare un tipo di variabile casuale per compilazione, questo codice funziona perfettamente.

Per il polimorfismo, devi solo conoscere il tipo e i requisiti di conservazione. L'array di descrittori DATA_DESC fornisce un modo per tenere traccia di ogni dato che viene inserito in QUEUE_DESC.pBuffer in modo che possa essere recuperato correttamente. Vorrei solo allocare memoria pBuffer sufficiente per contenere tutti gli elementi del tipo di dati più grande, ma tenere traccia di quanto spazio di archiviazione in quel dato dato sta effettivamente utilizzando in DATA_DESC.dBytes. L'alternativa è reinventare un gestore di heap.

Ciò significa che UCHAR * pBuffer di QUEUE_DESC avrebbe un array parallelo per tenere traccia del tipo di dati e delle dimensioni, mentre la posizione di archiviazione di un datum in pBuffer rimarrebbe esattamente come è ora. Il nuovo membro sarebbe qualcosa di simile a DATA_DESC * pDataDesc o, forse, DATA_DESC DataDesc [2 ^ BITS_ELE_KNT] se riesci a trovare un modo per battere il tuo compilatore nella presentazione con un riferimento in avanti. Calloc () è sempre più flessibile in queste situazioni.

Dovresti comunque memcpy () in Q_Put (), Q_Get, ma il numero di byte effettivamente copiati sarebbe determinato da DATA_DESC.dBytes, non da QUEUE_DESC.EleBytes. Gli elementi sono potenzialmente tutti di diversi tipi / dimensioni per ogni dato put o get.

Credo che questo codice soddisfi i requisiti di velocità e dimensione del buffer e possa essere realizzato per soddisfare il requisito per 6 diversi tipi di dati. Ho lasciato le numerose apparecchiature di test, sotto forma di istruzioni printf (), in modo da poterti verificare (o meno) che il codice funzioni correttamente. Il generatore di numeri casuali dimostra che il codice funziona per qualsiasi combinazione testa / coda casuale.

enter code here
// Queue_Small.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <stdio.h>
#include <time.h>
#include <limits.h>
#include <stdlib.h>
#include <malloc.h>
#include <memory.h>
#include <math.h>

#define UCHAR unsigned char
#define ULONG unsigned long
#define USHRT unsigned short
#define dbl   double
/* Queue structure */
#define QUEUE_FULL_FLAG 1
#define QUEUE_EMPTY_FLAG -1
#define QUEUE_OK 0
//  
#define BITS_ELE_KNT    12  //12 bits will create 4.096 elements numbered 0-4095
//
//typedef struct    {
//  USHRT dBytes:8;     //amount of QUEUE_DESC.EleBytes storage used by datatype
//  USHRT dType :3; //supports 8 possible data types (0-7)
//  USHRT dFoo  :5; //unused bits of the unsigned short host's storage
// }    DATA_DESC;
//  This descriptor gives a home to all the housekeeping variables
typedef struct  {
    UCHAR   *pBuffer;   //  pointer to storage, 16 to 4096 elements
    ULONG Tail  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG Head  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG EleBytes  :8;     //  sizeof(elements) with range of 0-256 bytes
    // some unused bits will be left over if BITS_ELE_KNT < 12
    USHRT EleKnt    :BITS_ELE_KNT +1;// 1 extra bit for # elements (1-4096)
    //USHRT Flags   :(8*sizeof(USHRT) - BITS_ELE_KNT +1);   //  flags you can use
    USHRT   IsFull  :1;     // queue is full
    USHRT   IsEmpty :1;     // queue is empty
    USHRT   Unused  :1;     // 16th bit of USHRT
}   QUEUE_DESC;

//  ---------------------------------------------------------------------------
//  Function prototypes
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz);
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew);
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q);
//  ---------------------------------------------------------------------------
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz)    {
    memset((void *)Q, 0, sizeof(QUEUE_DESC));//init flags and bit integers to zero
    //select buffer size from powers of 2 to receive modulo 
    //                arithmetic benefit of bit uints overflowing
    Q->EleKnt   =   (USHRT)pow(2.0, BitsForEleKnt);
    Q->EleBytes =   DataTypeSz; // how much storage for each element?
    //  Randomly generated head, tail a test fixture only. 
    //      Demonstrates that the queue can be entered at a random point 
    //      and still perform properly. Normally zero
    srand(unsigned(time(NULL)));    // seed random number generator with current time
    Q->Head = Q->Tail = rand(); // supposed to be set to zero here, or by memset
    Q->Head = Q->Tail = 0;
    //  allocate queue's storage
    if(NULL == (Q->pBuffer = (UCHAR *)calloc(Q->EleKnt, Q->EleBytes)))  {
        return NULL;
    }   else    {
        return Q;
    }
}
//  ---------------------------------------------------------------------------
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew)   
{
    memcpy(Q->pBuffer + (Q->Tail * Q->EleBytes), pNew, Q->EleBytes);
    if(Q->Tail == (Q->Head + Q->EleKnt)) {
        //  Q->IsFull = 1;
        Q->Tail += 1;   
        return QUEUE_FULL_FLAG; //  queue is full
    }
    Q->Tail += 1;   //  the unsigned bit int MUST wrap around, just like modulo
    return QUEUE_OK; // No errors
}
//  ---------------------------------------------------------------------------
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q)   
{
    memcpy(pOld, Q->pBuffer + (Q->Head * Q->EleBytes), Q->EleBytes);
    Q->Head += 1;   //  the bit int MUST wrap around, just like modulo

    if(Q->Head == Q->Tail)      {
        //  Q->IsEmpty = 1;
        return QUEUE_EMPTY_FLAG; // queue Empty - nothing to get
    }
    return QUEUE_OK; // No errors
}
//
//  ---------------------------------------------------------------------------
int _tmain(int argc, _TCHAR* argv[])    {
//  constrain buffer size to some power of 2 to force faux modulo arithmetic
    int LoopKnt = 1000000;  //  for benchmarking purposes only
    int k, i=0, Qview=0;
    time_t start;
    QUEUE_DESC Queue, *Q;
    if(NULL == (Q = Q_Init(&Queue, BITS_ELE_KNT, sizeof(int)))) {
        printf("\nProgram failed to initialize. Aborting.\n\n");
        return 0;
    }

    start = clock();
    for(k=0; k<LoopKnt; k++)    {
        //printf("\n\n Fill'er up please...\n");
        //Q->Head = Q->Tail = rand();
        for(i=1; i<= Q->EleKnt; i++)    {
            Qview = i*i;
            if(QUEUE_FULL_FLAG == Q_Put(Q, (UCHAR *)&Qview))    {
                //printf("\nQueue is full at %i \n", i);
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //  Get data from queue until completely drained (empty)
        //
        //printf("\n\n Step into the lab, and see what's on the slab... \n");
        Qview = 0;
        for(i=1; i; i++)    {
            if(QUEUE_EMPTY_FLAG == Q_Get((UCHAR *)&Qview, Q))   {
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                //printf("\nQueue is empty at %i", i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    }
    printf("\nQueue time was %5.3f to fill & drain %i element queue  %i times \n", 
                     (dbl)(clock()-start)/(dbl)CLOCKS_PER_SEC,Q->EleKnt, LoopKnt);
    printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    getchar();
    return 0;
}

Ecco una semplice soluzione in C. Supponiamo che gli interrupt siano disattivati ??per ogni funzione. Nessun polimorfismo e amp; roba, solo buon senso.


#define BUFSIZE 128
char buf[BUFSIZE];
char *pIn, *pOut, *pEnd;
char full;

// init
void buf_init()
{
    pIn = pOut = buf;       // init to any slot in buffer
    pEnd = &buf[BUFSIZE];   // past last valid slot in buffer
    full = 0;               // buffer is empty
}

// add char 'c' to buffer
int buf_put(char c)
{
    if (pIn == pOut  &&  full)
        return 0;           // buffer overrun

    *pIn++ = c;             // insert c into buffer
    if (pIn >= pEnd)        // end of circular buffer?
        pIn = buf;          // wrap around

    if (pIn == pOut)        // did we run into the output ptr?
        full = 1;           // can't add any more data into buffer
    return 1;               // all OK
}

// get a char from circular buffer
int buf_get(char *pc)
{
    if (pIn == pOut  &&  !full)
        return 0;           // buffer empty  FAIL

    *pc = *pOut++;              // pick up next char to be returned
    if (pOut >= pEnd)       // end of circular buffer?
        pOut = buf;         // wrap around

    full = 0;               // there is at least 1 slot
    return 1;               // *pc has the data to be returned
}

Una semplice implementazione potrebbe consistere in:

  • Un buffer, implementato come un array di dimensioni n, di qualunque tipo sia necessario
  • Un puntatore o indice di lettura (qualunque sia più efficiente per il tuo processore)
  • Un puntatore o indice di scrittura
  • Un contatore che indica la quantità di dati nel buffer (derivabile dai puntatori di lettura e scrittura, ma più veloce per monitorarli separatamente)

Ogni volta che si scrivono dati, si fa avanzare il puntatore di scrittura e si incrementa il contatore. Quando leggi i dati, aumenti il ??puntatore di lettura e diminuisci il contatore. Se uno dei due puntatori raggiunge n, impostalo su zero.

Non puoi scrivere se counter = n. Non puoi leggere se counter = 0.

Stile C, buffer ad anello semplice per numeri interi. Prima usa init che usa put and get. Se il buffer non contiene alcun dato, restituisce "0". zero.

//=====================================
// ring buffer address based
//=====================================
#define cRingBufCount   512
int     sRingBuf[cRingBufCount];    // Ring Buffer
int     sRingBufPut;                // Input index address
int     sRingBufGet;                // Output index address
Bool    sRingOverWrite;

void    GetRingBufCount(void)
{
int     r;
`       r= sRingBufPut - sRingBufGet;
        if ( r < cRingBufCount ) r+= cRingBufCount;
        return r; 
}

void    InitRingBuffer(void)
{
        sRingBufPut= 0;
        sRingBufGet= 0;
}       

void    PutRingBuffer(int d)
{
        sRingBuffer[sRingBufPut]= d;
        if (sRingBufPut==sRingBufGet)// both address are like ziro
        {
            sRingBufPut= IncRingBufferPointer(sRingBufPut);
            sRingBufGet= IncRingBufferPointer(sRingBufGet);
        }
        else //Put over write a data
        {
            sRingBufPut= IncRingBufferPointer(sRingBufPut);
            if (sRingBufPut==sRingBufGet)
            {
                sRingOverWrite= Ture;
                sRingBufGet= IncRingBufferPointer(sRingBufGet);
            }
        }
}

int     GetRingBuffer(void)
{
int     r;
        if (sRingBufGet==sRingBufPut) return 0;
        r= sRingBuf[sRingBufGet];
        sRingBufGet= IncRingBufferPointer(sRingBufGet);
        sRingOverWrite=False;
        return r;
}

int     IncRingBufferPointer(int a)
{
        a+= 1;
        if (a>= cRingBufCount) a= 0;
        return a;
}
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top