Come si implementa un buffer circolare in C?
-
06-07-2019 - |
Domanda
Ho bisogno di un buffer circolare di dimensioni fisse (selezionabile in fase di esecuzione durante la creazione, non in fase di compilazione) che può contenere oggetti di qualsiasi tipo e deve essere molto alto prestazione. Non credo che ci saranno problemi di contesa di risorse poiché, sebbene si trovi in ??un ambiente incorporato multi-tasking, è cooperativo, quindi i compiti stessi possono gestirlo.
Il mio pensiero iniziale era quello di memorizzare una semplice struttura nel buffer che contenesse il tipo (enum / define semplice) e un puntatore vuoto al payload, ma voglio che questo sia il più veloce possibile, quindi sono aperto ai suggerimenti che comportano il bypass dell'heap.
In realtà sono felice di bypassare qualsiasi libreria standard per la velocità pura - da quello che ho visto del codice, non è fortemente ottimizzato per la CPU: sembra che abbiano appena compilato il codice C per cose come
Qualsiasi codice o idea sarebbe molto apprezzato. Le operazioni richieste sono:
- crea un buffer con dimensioni specifiche.
- metti alla coda.
- prendi la testa.
- restituisce il conteggio.
- elimina un buffer.
Soluzione
Puoi enumerare i tipi necessari al momento della codifica del buffer o devi essere in grado di aggiungere tipi in fase di esecuzione tramite chiamate dinamiche? Se il primo, allora creerei il buffer come un array allocato in heap di n-struct, in cui ogni struct è composta da due elementi: un tag enum che identifica il tipo di dati e un'unione di tutti i tipi di dati. Ciò che perdi in termini di spazio di archiviazione aggiuntivo per piccoli elementi, compensi in termini di non dover gestire l'allocazione / deallocazione e la conseguente frammentazione della memoria. Quindi devi solo tenere traccia degli indici di inizio e fine che definiscono gli elementi head e tail del buffer e assicurarti di calcolare il mod n quando incrementa / decrementa gli indici.
Altri suggerimenti
La soluzione più semplice sarebbe quella di tenere traccia delle dimensioni degli elementi e del numero di elementi, quindi creare un buffer del numero appropriato di byte:
typedef struct circular_buffer
{
void *buffer; // data buffer
void *buffer_end; // end of data buffer
size_t capacity; // maximum number of items in the buffer
size_t count; // number of items in the buffer
size_t sz; // size of each item in the buffer
void *head; // pointer to head
void *tail; // pointer to tail
} circular_buffer;
void cb_init(circular_buffer *cb, size_t capacity, size_t sz)
{
cb->buffer = malloc(capacity * sz);
if(cb->buffer == NULL)
// handle error
cb->buffer_end = (char *)cb->buffer + capacity * sz;
cb->capacity = capacity;
cb->count = 0;
cb->sz = sz;
cb->head = cb->buffer;
cb->tail = cb->buffer;
}
void cb_free(circular_buffer *cb)
{
free(cb->buffer);
// clear out other fields too, just to be safe
}
void cb_push_back(circular_buffer *cb, const void *item)
{
if(cb->count == cb->capacity){
// handle error
}
memcpy(cb->head, item, cb->sz);
cb->head = (char*)cb->head + cb->sz;
if(cb->head == cb->buffer_end)
cb->head = cb->buffer;
cb->count++;
}
void cb_pop_front(circular_buffer *cb, void *item)
{
if(cb->count == 0){
// handle error
}
memcpy(item, cb->tail, cb->sz);
cb->tail = (char*)cb->tail + cb->sz;
if(cb->tail == cb->buffer_end)
cb->tail = cb->buffer;
cb->count--;
}
// Note power of two buffer size
#define kNumPointsInMyBuffer 1024
typedef struct _ringBuffer {
UInt32 currentIndex;
UInt32 sizeOfBuffer;
double data[kNumPointsInMyBuffer];
} ringBuffer;
// Initialize the ring buffer
ringBuffer *myRingBuffer = (ringBuffer *)calloc(1, sizeof(ringBuffer));
myRingBuffer->sizeOfBuffer = kNumPointsInMyBuffer;
myRingBuffer->currentIndex = 0;
// A little function to write into the buffer
// N.B. First argument of writeIntoBuffer() just happens to have the
// same as the one calloc'ed above. It will only point to the same
// space in memory if the calloc'ed pointer is passed to
// writeIntoBuffer() as an arg when the function is called. Consider
// using another name for clarity
void writeIntoBuffer(ringBuffer *myRingBuffer, double *myData, int numsamples) {
// -1 for our binary modulo in a moment
int buffLen = myRingBuffer->sizeOfBuffer - 1;
int lastWrittenSample = myRingBuffer->currentIndex;
int idx;
for (int i=0; i < numsamples; ++i) {
// modulo will automagically wrap around our index
idx = (i + lastWrittenSample) & buffLen;
myRingBuffer->data[idx] = myData[i];
}
// Update the current index of our ring buffer.
myRingBuffer->currentIndex += numsamples;
myRingBuffer->currentIndex &= myRingBuffer->sizeOfBuffer - 1;
}
Finché la lunghezza del buffer del tuo ring è una potenza di due, il binario incredibilmente veloce "quot" & amp; " l'operazione avvolgerà il tuo indice per te. Per la mia applicazione, sto visualizzando all'utente un segmento di audio da un ring buffer di audio acquisito da un microfono.
Mi assicuro sempre che la massima quantità di audio che può essere visualizzata sullo schermo sia molto inferiore alla dimensione del ring buffer. Altrimenti potresti leggere e scrivere dallo stesso pezzo. Questo probabilmente ti darebbe strani artefatti di visualizzazione.
Innanzitutto, il titolo. Non hai bisogno dell'aritmetica modulo per avvolgere il buffer se usi bit ints per contenere head & amp; coda "puntatori" e dimensionali in modo che siano perfettamente sincronizzati. IE: 4096 inserito in un int senza segno a 12 bit è 0 tutto da solo, non molestato in alcun modo. L'eliminazione dell'aritmetica del modulo, anche per potenze di 2, raddoppia la velocità - quasi esattamente.
10 milioni di iterazioni di riempimento e svuotamento di un buffer 4096 di qualsiasi tipo di elementi di dati impiegano 52 secondi sul mio Dell XPS 8500 i7 di terza generazione che utilizza il compilatore C ++ di Visual Studio 2010 con allineamento predefinito e 1/8192 di quello per servire un dato .
RX riscriverei i loop di test in main () in modo che non controllino più il flusso - che è, e dovrebbe essere, controllato dai valori di ritorno che indicano che il buffer è pieno o vuoto e che l'interruzione si rompe; dichiarazioni. IE: il filler e lo scolapiatti dovrebbero essere in grado di battersi l'uno contro l'altro senza corruzione o instabilità. Ad un certo punto spero di multi-thread questo codice, per cui quel comportamento sarà cruciale.
La funzione QUEUE_DESC (descrittore di coda) e l'inizializzazione forza tutti i buffer in questo codice ad essere una potenza di 2. Lo schema sopra NON funzionerà diversamente. Mentre sei sull'argomento, nota che QUEUE_DESC non è hardcoded, usa una costante manifest (#define BITS_ELE_KNT) per la sua costruzione. (Suppongo che una potenza di 2 sia sufficiente flessibilità qui)
Per rendere selezionabile il tempo di esecuzione della dimensione del buffer, ho provato diversi approcci (non mostrati qui) e ho deciso di usare USHRT per Head, Tail, EleKnt in grado di gestire un buffer FIFO [USHRT]. Per evitare l'aritmetica del modulo ho creato una maschera per & amp; & amp; con Head, Tail, ma quella maschera risulta essere (EleKnt -1), quindi basta usare quella. L'uso di USHRTS anziché bit ha migliorato le prestazioni del 15% su una macchina silenziosa. I core della CPU Intel sono sempre stati più veloci dei loro bus, quindi su una macchina occupata e condivisa, il confezionamento delle strutture dei dati ti consente di caricare ed eseguire prima di altri thread concorrenti. Trade-off.
Notare che l'archiviazione effettiva per il buffer è allocata sull'heap con calloc () e il puntatore si trova alla base della struttura, quindi la struttura e il puntatore hanno ESATTAMENTE lo stesso indirizzo. IE; non è necessario aggiungere alcun offset all'indirizzo di struttura per collegare i registri.
In questa stessa ottica, tutte le variabili associate alla manutenzione del buffer sono fisicamente adiacenti al buffer, legate nella stessa struttura, in modo che il compilatore possa creare un linguaggio di assemblaggio meraviglioso. Dovrai interrompere l'ottimizzazione in linea per vedere qualsiasi assembly, perché altrimenti viene schiacciato nell'oblio.
Per supportare il polimorfismo di qualsiasi tipo di dati, ho usato memcpy () invece di assegnazioni. Se hai bisogno solo della flessibilità per supportare un tipo di variabile casuale per compilazione, questo codice funziona perfettamente.
Per il polimorfismo, devi solo conoscere il tipo e i requisiti di conservazione. L'array di descrittori DATA_DESC fornisce un modo per tenere traccia di ogni dato che viene inserito in QUEUE_DESC.pBuffer in modo che possa essere recuperato correttamente. Vorrei solo allocare memoria pBuffer sufficiente per contenere tutti gli elementi del tipo di dati più grande, ma tenere traccia di quanto spazio di archiviazione in quel dato dato sta effettivamente utilizzando in DATA_DESC.dBytes. L'alternativa è reinventare un gestore di heap.
Ciò significa che UCHAR * pBuffer di QUEUE_DESC avrebbe un array parallelo per tenere traccia del tipo di dati e delle dimensioni, mentre la posizione di archiviazione di un datum in pBuffer rimarrebbe esattamente come è ora. Il nuovo membro sarebbe qualcosa di simile a DATA_DESC * pDataDesc o, forse, DATA_DESC DataDesc [2 ^ BITS_ELE_KNT] se riesci a trovare un modo per battere il tuo compilatore nella presentazione con un riferimento in avanti. Calloc () è sempre più flessibile in queste situazioni.
Dovresti comunque memcpy () in Q_Put (), Q_Get, ma il numero di byte effettivamente copiati sarebbe determinato da DATA_DESC.dBytes, non da QUEUE_DESC.EleBytes. Gli elementi sono potenzialmente tutti di diversi tipi / dimensioni per ogni dato put o get.
Credo che questo codice soddisfi i requisiti di velocità e dimensione del buffer e possa essere realizzato per soddisfare il requisito per 6 diversi tipi di dati. Ho lasciato le numerose apparecchiature di test, sotto forma di istruzioni printf (), in modo da poterti verificare (o meno) che il codice funzioni correttamente. Il generatore di numeri casuali dimostra che il codice funziona per qualsiasi combinazione testa / coda casuale.
enter code here
// Queue_Small.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <stdio.h>
#include <time.h>
#include <limits.h>
#include <stdlib.h>
#include <malloc.h>
#include <memory.h>
#include <math.h>
#define UCHAR unsigned char
#define ULONG unsigned long
#define USHRT unsigned short
#define dbl double
/* Queue structure */
#define QUEUE_FULL_FLAG 1
#define QUEUE_EMPTY_FLAG -1
#define QUEUE_OK 0
//
#define BITS_ELE_KNT 12 //12 bits will create 4.096 elements numbered 0-4095
//
//typedef struct {
// USHRT dBytes:8; //amount of QUEUE_DESC.EleBytes storage used by datatype
// USHRT dType :3; //supports 8 possible data types (0-7)
// USHRT dFoo :5; //unused bits of the unsigned short host's storage
// } DATA_DESC;
// This descriptor gives a home to all the housekeeping variables
typedef struct {
UCHAR *pBuffer; // pointer to storage, 16 to 4096 elements
ULONG Tail :BITS_ELE_KNT; // # elements, with range of 0-4095
ULONG Head :BITS_ELE_KNT; // # elements, with range of 0-4095
ULONG EleBytes :8; // sizeof(elements) with range of 0-256 bytes
// some unused bits will be left over if BITS_ELE_KNT < 12
USHRT EleKnt :BITS_ELE_KNT +1;// 1 extra bit for # elements (1-4096)
//USHRT Flags :(8*sizeof(USHRT) - BITS_ELE_KNT +1); // flags you can use
USHRT IsFull :1; // queue is full
USHRT IsEmpty :1; // queue is empty
USHRT Unused :1; // 16th bit of USHRT
} QUEUE_DESC;
// ---------------------------------------------------------------------------
// Function prototypes
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz);
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew);
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q);
// ---------------------------------------------------------------------------
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz) {
memset((void *)Q, 0, sizeof(QUEUE_DESC));//init flags and bit integers to zero
//select buffer size from powers of 2 to receive modulo
// arithmetic benefit of bit uints overflowing
Q->EleKnt = (USHRT)pow(2.0, BitsForEleKnt);
Q->EleBytes = DataTypeSz; // how much storage for each element?
// Randomly generated head, tail a test fixture only.
// Demonstrates that the queue can be entered at a random point
// and still perform properly. Normally zero
srand(unsigned(time(NULL))); // seed random number generator with current time
Q->Head = Q->Tail = rand(); // supposed to be set to zero here, or by memset
Q->Head = Q->Tail = 0;
// allocate queue's storage
if(NULL == (Q->pBuffer = (UCHAR *)calloc(Q->EleKnt, Q->EleBytes))) {
return NULL;
} else {
return Q;
}
}
// ---------------------------------------------------------------------------
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew)
{
memcpy(Q->pBuffer + (Q->Tail * Q->EleBytes), pNew, Q->EleBytes);
if(Q->Tail == (Q->Head + Q->EleKnt)) {
// Q->IsFull = 1;
Q->Tail += 1;
return QUEUE_FULL_FLAG; // queue is full
}
Q->Tail += 1; // the unsigned bit int MUST wrap around, just like modulo
return QUEUE_OK; // No errors
}
// ---------------------------------------------------------------------------
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q)
{
memcpy(pOld, Q->pBuffer + (Q->Head * Q->EleBytes), Q->EleBytes);
Q->Head += 1; // the bit int MUST wrap around, just like modulo
if(Q->Head == Q->Tail) {
// Q->IsEmpty = 1;
return QUEUE_EMPTY_FLAG; // queue Empty - nothing to get
}
return QUEUE_OK; // No errors
}
//
// ---------------------------------------------------------------------------
int _tmain(int argc, _TCHAR* argv[]) {
// constrain buffer size to some power of 2 to force faux modulo arithmetic
int LoopKnt = 1000000; // for benchmarking purposes only
int k, i=0, Qview=0;
time_t start;
QUEUE_DESC Queue, *Q;
if(NULL == (Q = Q_Init(&Queue, BITS_ELE_KNT, sizeof(int)))) {
printf("\nProgram failed to initialize. Aborting.\n\n");
return 0;
}
start = clock();
for(k=0; k<LoopKnt; k++) {
//printf("\n\n Fill'er up please...\n");
//Q->Head = Q->Tail = rand();
for(i=1; i<= Q->EleKnt; i++) {
Qview = i*i;
if(QUEUE_FULL_FLAG == Q_Put(Q, (UCHAR *)&Qview)) {
//printf("\nQueue is full at %i \n", i);
//printf("\nQueue value of %i should be %i squared", Qview, i);
break;
}
//printf("\nQueue value of %i should be %i squared", Qview, i);
}
// Get data from queue until completely drained (empty)
//
//printf("\n\n Step into the lab, and see what's on the slab... \n");
Qview = 0;
for(i=1; i; i++) {
if(QUEUE_EMPTY_FLAG == Q_Get((UCHAR *)&Qview, Q)) {
//printf("\nQueue value of %i should be %i squared", Qview, i);
//printf("\nQueue is empty at %i", i);
break;
}
//printf("\nQueue value of %i should be %i squared", Qview, i);
}
//printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
}
printf("\nQueue time was %5.3f to fill & drain %i element queue %i times \n",
(dbl)(clock()-start)/(dbl)CLOCKS_PER_SEC,Q->EleKnt, LoopKnt);
printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
getchar();
return 0;
}
Ecco una semplice soluzione in C. Supponiamo che gli interrupt siano disattivati ??per ogni funzione. Nessun polimorfismo e amp; roba, solo buon senso.
#define BUFSIZE 128
char buf[BUFSIZE];
char *pIn, *pOut, *pEnd;
char full;
// init
void buf_init()
{
pIn = pOut = buf; // init to any slot in buffer
pEnd = &buf[BUFSIZE]; // past last valid slot in buffer
full = 0; // buffer is empty
}
// add char 'c' to buffer
int buf_put(char c)
{
if (pIn == pOut && full)
return 0; // buffer overrun
*pIn++ = c; // insert c into buffer
if (pIn >= pEnd) // end of circular buffer?
pIn = buf; // wrap around
if (pIn == pOut) // did we run into the output ptr?
full = 1; // can't add any more data into buffer
return 1; // all OK
}
// get a char from circular buffer
int buf_get(char *pc)
{
if (pIn == pOut && !full)
return 0; // buffer empty FAIL
*pc = *pOut++; // pick up next char to be returned
if (pOut >= pEnd) // end of circular buffer?
pOut = buf; // wrap around
full = 0; // there is at least 1 slot
return 1; // *pc has the data to be returned
}
Una semplice implementazione potrebbe consistere in:
- Un buffer, implementato come un array di dimensioni n, di qualunque tipo sia necessario
- Un puntatore o indice di lettura (qualunque sia più efficiente per il tuo processore)
- Un puntatore o indice di scrittura
- Un contatore che indica la quantità di dati nel buffer (derivabile dai puntatori di lettura e scrittura, ma più veloce per monitorarli separatamente)
Ogni volta che si scrivono dati, si fa avanzare il puntatore di scrittura e si incrementa il contatore. Quando leggi i dati, aumenti il ??puntatore di lettura e diminuisci il contatore. Se uno dei due puntatori raggiunge n, impostalo su zero.
Non puoi scrivere se counter = n. Non puoi leggere se counter = 0.
Stile C, buffer ad anello semplice per numeri interi. Prima usa init che usa put and get. Se il buffer non contiene alcun dato, restituisce "0". zero.
//=====================================
// ring buffer address based
//=====================================
#define cRingBufCount 512
int sRingBuf[cRingBufCount]; // Ring Buffer
int sRingBufPut; // Input index address
int sRingBufGet; // Output index address
Bool sRingOverWrite;
void GetRingBufCount(void)
{
int r;
` r= sRingBufPut - sRingBufGet;
if ( r < cRingBufCount ) r+= cRingBufCount;
return r;
}
void InitRingBuffer(void)
{
sRingBufPut= 0;
sRingBufGet= 0;
}
void PutRingBuffer(int d)
{
sRingBuffer[sRingBufPut]= d;
if (sRingBufPut==sRingBufGet)// both address are like ziro
{
sRingBufPut= IncRingBufferPointer(sRingBufPut);
sRingBufGet= IncRingBufferPointer(sRingBufGet);
}
else //Put over write a data
{
sRingBufPut= IncRingBufferPointer(sRingBufPut);
if (sRingBufPut==sRingBufGet)
{
sRingOverWrite= Ture;
sRingBufGet= IncRingBufferPointer(sRingBufGet);
}
}
}
int GetRingBuffer(void)
{
int r;
if (sRingBufGet==sRingBufPut) return 0;
r= sRingBuf[sRingBufGet];
sRingBufGet= IncRingBufferPointer(sRingBufGet);
sRingOverWrite=False;
return r;
}
int IncRingBufferPointer(int a)
{
a+= 1;
if (a>= cRingBufCount) a= 0;
return a;
}