Вопрос

Мне нужен кольцевой буфер фиксированного размера (выбираемый во время выполнения при его создании, а не во время компиляции), который может содержать объекты любого типа, и он должен быть очень высокая производительность.Я не думаю, что возникнут проблемы конкуренции за ресурсы, поскольку, хотя это и многозадачная встроенная среда, она является совместной средой, поэтому задачи сами могут справиться с этим.

Моя первоначальная мысль заключалась в том, чтобы сохранить в буфере простую структуру, которая будет содержать тип (простое перечисление/определение) и указатель void на полезную нагрузку, но я хочу, чтобы это было как можно быстрее, поэтому я открыт для предложений, которые включают обход куча.

На самом деле я рад обойти любую стандартную библиотеку для получения чистой скорости - судя по тому, что я видел в коде, она не сильно оптимизирована для ЦП:похоже, они только что скомпилировали код C для таких вещей, как strcpy() и так, сборки вручную не существует.

Любой код или идеи будут с благодарностью оценены.Необходимые операции:

  • создать буфер определенного размера.
  • положить в хвост.
  • получить из головы.
  • вернуть счет.
  • удалить буфер.
Это было полезно?

Решение

Можете ли вы перечислить типы, необходимые во время написания кода буфера, или вам нужно иметь возможность добавлять типы во время выполнения с помощью динамических вызовов?В первом случае я бы создал буфер как массив из n структур, выделенный в куче, где каждая структура состоит из двух элементов:тег перечисления, идентифицирующий тип данных, и объединение всех типов данных.То, что вы теряете в плане дополнительного хранилища для небольших элементов, вы компенсируете отсутствием необходимости иметь дело с выделением/освобождением и, как следствие, фрагментацией памяти.Тогда вам просто нужно отслеживать начальный и конечный индексы, которые определяют начальные и конечные элементы буфера, и обязательно вычислять mod n при увеличении/уменьшении индексов.

Другие советы

Самым простым решением было бы отслеживать размер и количество элементов, а затем создать буфер соответствующего количества байтов:

typedef struct circular_buffer
{
    void *buffer;     // data buffer
    void *buffer_end; // end of data buffer
    size_t capacity;  // maximum number of items in the buffer
    size_t count;     // number of items in the buffer
    size_t sz;        // size of each item in the buffer
    void *head;       // pointer to head
    void *tail;       // pointer to tail
} circular_buffer;

void cb_init(circular_buffer *cb, size_t capacity, size_t sz)
{
    cb->buffer = malloc(capacity * sz);
    if(cb->buffer == NULL)
        // handle error
    cb->buffer_end = (char *)cb->buffer + capacity * sz;
    cb->capacity = capacity;
    cb->count = 0;
    cb->sz = sz;
    cb->head = cb->buffer;
    cb->tail = cb->buffer;
}

void cb_free(circular_buffer *cb)
{
    free(cb->buffer);
    // clear out other fields too, just to be safe
}

void cb_push_back(circular_buffer *cb, const void *item)
{
    if(cb->count == cb->capacity){
        // handle error
    }
    memcpy(cb->head, item, cb->sz);
    cb->head = (char*)cb->head + cb->sz;
    if(cb->head == cb->buffer_end)
        cb->head = cb->buffer;
    cb->count++;
}

void cb_pop_front(circular_buffer *cb, void *item)
{
    if(cb->count == 0){
        // handle error
    }
    memcpy(item, cb->tail, cb->sz);
    cb->tail = (char*)cb->tail + cb->sz;
    if(cb->tail == cb->buffer_end)
        cb->tail = cb->buffer;
    cb->count--;
}
// Note power of two buffer size
#define kNumPointsInMyBuffer 1024 

typedef struct _ringBuffer {
    UInt32 currentIndex;
    UInt32 sizeOfBuffer;
    double data[kNumPointsInMyBuffer];
} ringBuffer;

// Initialize the ring buffer
ringBuffer *myRingBuffer = (ringBuffer *)calloc(1, sizeof(ringBuffer));
myRingBuffer->sizeOfBuffer = kNumPointsInMyBuffer;
myRingBuffer->currentIndex = 0;

// A little function to write into the buffer
// N.B. First argument of writeIntoBuffer() just happens to have the
// same as the one calloc'ed above. It will only point to the same
// space in memory if the calloc'ed pointer is passed to
// writeIntoBuffer() as an arg when the function is called. Consider
// using another name for clarity
void writeIntoBuffer(ringBuffer *myRingBuffer, double *myData, int numsamples) {
    // -1 for our binary modulo in a moment
    int buffLen = myRingBuffer->sizeOfBuffer - 1;
    int lastWrittenSample = myRingBuffer->currentIndex;

    int idx;
    for (int i=0; i < numsamples; ++i) {
        // modulo will automagically wrap around our index
        idx = (i + lastWrittenSample) & buffLen; 
        myRingBuffer->data[idx] = myData[i];
    }

    // Update the current index of our ring buffer.
    myRingBuffer->currentIndex += numsamples;
    myRingBuffer->currentIndex &= myRingBuffer->sizeOfBuffer - 1;
}

Пока длина вашего кольцевого буфера равна степени двойки, невероятно быстрая двоичная операция «&» обработает ваш индекс за вас.В своем приложении я показываю пользователю сегмент аудио из кольцевого буфера звука, полученного с микрофона.

Я всегда слежу за тем, чтобы максимальное количество звука, которое может отображаться на экране, было намного меньше размера кольцевого буфера.В противном случае вы можете читать и писать из одного и того же фрагмента.Это, вероятно, приведет к странным артефактам отображения.

Во-первых, заголовок.Вам не нужна арифметика по модулю для переноса буфера, если вы используете битовые целые числа для хранения «указателей» головы и хвоста и определяете их размер так, чтобы они были идеально синхронизированы.ИЕ:4096, вставленное в 12-битное беззнаковое целое число, само по себе равно 0, никоим образом не затронутое.Устранение арифметики по модулю, даже для степеней 2, удваивает скорость - почти точно.

10 миллионов итераций заполнения и очистки буфера 4096 элементами данных любого типа занимают 52 секунды на моем Dell XPS 8500 третьего поколения i7 с использованием компилятора C++ Visual Studio 2010 со встраиванием по умолчанию, и 1/8192 секунды из этого времени приходится на обслуживание данных.

Я бы RX переписал тестовые циклы в main(), чтобы они больше не управляли потоком, который контролируется и должен контролироваться возвращаемыми значениями, указывающими, что буфер заполнен или пуст, и сопутствующим прерыванием;заявления.ИЕ:наполнитель и слив должны иметь возможность ударяться друг о друга без повреждений и нестабильности.В какой-то момент я надеюсь сделать этот код многопоточным, и тогда это поведение станет решающим.

QUEUE_DESC (дескриптор очереди) и функция инициализации заставляют все буферы в этом коде иметь степень 2.В противном случае приведенная выше схема НЕ будет работать.Продолжая тему, обратите внимание, что QUEUE_DESC не запрограммирован жестко, он использует константу манифеста (#define BITS_ELE_KNT) для своего построения.(Я предполагаю, что степень 2 здесь достаточная гибкость)

Чтобы сделать выбор размера буфера во время выполнения, я попробовал разные подходы (здесь не показаны) и остановился на использовании USHRT для Head, Tail, EleKnt, способных управлять буфером FIFO [USHRT].Чтобы избежать арифметики по модулю, я создал маску для && с Head, Tail, но эта маска оказалась (EleKnt -1), поэтому просто используйте ее.Использование USHRTS вместо битовых целых увеличило производительность примерно на 15% на тихой машине.Ядра ЦП Intel всегда были быстрее своих шин, поэтому на загруженной общей машине упаковка структур данных позволяет вам загружаться и выполняться раньше других конкурирующих потоков.Компромиссы.

Обратите внимание, что фактическое хранилище для буфера выделяется в куче с помощью calloc(), а указатель находится в основании структуры, поэтому структура и указатель имеют ТОЧНО одинаковый адрес.ИП;не требуется добавлять смещение к адресу структуры для связывания регистров.

Точно так же все переменные, связанные с обслуживанием буфера, физически примыкают к буферу и связаны в одну структуру, поэтому компилятор может создавать красивый язык ассемблера.Вам придется отключить встроенную оптимизацию, чтобы увидеть любую сборку, потому что в противном случае она будет предана забвению.

Чтобы поддерживать полиморфизм любого типа данных, я использовал memcpy() вместо присваиваний.Если вам нужна гибкость для поддержки только одного типа случайной величины на каждую компиляцию, тогда этот код работает отлично.

Для полиморфизма вам просто нужно знать тип и требования к объему памяти.Массив дескрипторов DATA_DESC позволяет отслеживать все данные, помещаемые в QUEUE_DESC.pBuffer, чтобы их можно было правильно извлечь.Я бы просто выделил достаточно памяти pBuffer для хранения всех элементов самого большого типа данных, но отслеживал бы, какую часть этой памяти фактически использует данный элемент данных в DATA_DESC.dBytes.Альтернатива — заново изобрести менеджер кучи.

Это означает, что UCHAR *pBuffer QUEUE_DESC будет иметь параллельный сопутствующий массив для отслеживания типа и размера данных, в то время как место хранения данных в pBuffer останется таким же, как и сейчас.Новым членом будет что-то вроде DATA_DESC *pDataDesc или, возможно, DATA_DESC DataDesc[2^BITS_ELE_KNT], если вы сможете найти способ заставить свой компилятор выполнить отправку с помощью такой прямой ссылки.Calloc() всегда более гибок в таких ситуациях.

Вы по-прежнему будете использовать memcpy() в Q_Put(), Q_Get, но количество фактически скопированных байтов будет определяться DATA_DESC.dBytes, а не QUEUE_DESC.EleBytes.Потенциально все элементы имеют разные типы/размеры для любого данного put или get.

Я считаю, что этот код удовлетворяет требованиям к скорости и размеру буфера и может быть адаптирован для удовлетворения требований к шести различным типам данных.Я оставил множество тестовых приспособлений в виде операторов printf(), чтобы вы могли убедиться (или нет), что код работает правильно.Генератор случайных чисел демонстрирует, что код работает для любой случайной комбинации «голова/хвост».

enter code here
// Queue_Small.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <stdio.h>
#include <time.h>
#include <limits.h>
#include <stdlib.h>
#include <malloc.h>
#include <memory.h>
#include <math.h>

#define UCHAR unsigned char
#define ULONG unsigned long
#define USHRT unsigned short
#define dbl   double
/* Queue structure */
#define QUEUE_FULL_FLAG 1
#define QUEUE_EMPTY_FLAG -1
#define QUEUE_OK 0
//  
#define BITS_ELE_KNT    12  //12 bits will create 4.096 elements numbered 0-4095
//
//typedef struct    {
//  USHRT dBytes:8;     //amount of QUEUE_DESC.EleBytes storage used by datatype
//  USHRT dType :3; //supports 8 possible data types (0-7)
//  USHRT dFoo  :5; //unused bits of the unsigned short host's storage
// }    DATA_DESC;
//  This descriptor gives a home to all the housekeeping variables
typedef struct  {
    UCHAR   *pBuffer;   //  pointer to storage, 16 to 4096 elements
    ULONG Tail  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG Head  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG EleBytes  :8;     //  sizeof(elements) with range of 0-256 bytes
    // some unused bits will be left over if BITS_ELE_KNT < 12
    USHRT EleKnt    :BITS_ELE_KNT +1;// 1 extra bit for # elements (1-4096)
    //USHRT Flags   :(8*sizeof(USHRT) - BITS_ELE_KNT +1);   //  flags you can use
    USHRT   IsFull  :1;     // queue is full
    USHRT   IsEmpty :1;     // queue is empty
    USHRT   Unused  :1;     // 16th bit of USHRT
}   QUEUE_DESC;

//  ---------------------------------------------------------------------------
//  Function prototypes
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz);
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew);
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q);
//  ---------------------------------------------------------------------------
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz)    {
    memset((void *)Q, 0, sizeof(QUEUE_DESC));//init flags and bit integers to zero
    //select buffer size from powers of 2 to receive modulo 
    //                arithmetic benefit of bit uints overflowing
    Q->EleKnt   =   (USHRT)pow(2.0, BitsForEleKnt);
    Q->EleBytes =   DataTypeSz; // how much storage for each element?
    //  Randomly generated head, tail a test fixture only. 
    //      Demonstrates that the queue can be entered at a random point 
    //      and still perform properly. Normally zero
    srand(unsigned(time(NULL)));    // seed random number generator with current time
    Q->Head = Q->Tail = rand(); // supposed to be set to zero here, or by memset
    Q->Head = Q->Tail = 0;
    //  allocate queue's storage
    if(NULL == (Q->pBuffer = (UCHAR *)calloc(Q->EleKnt, Q->EleBytes)))  {
        return NULL;
    }   else    {
        return Q;
    }
}
//  ---------------------------------------------------------------------------
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew)   
{
    memcpy(Q->pBuffer + (Q->Tail * Q->EleBytes), pNew, Q->EleBytes);
    if(Q->Tail == (Q->Head + Q->EleKnt)) {
        //  Q->IsFull = 1;
        Q->Tail += 1;   
        return QUEUE_FULL_FLAG; //  queue is full
    }
    Q->Tail += 1;   //  the unsigned bit int MUST wrap around, just like modulo
    return QUEUE_OK; // No errors
}
//  ---------------------------------------------------------------------------
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q)   
{
    memcpy(pOld, Q->pBuffer + (Q->Head * Q->EleBytes), Q->EleBytes);
    Q->Head += 1;   //  the bit int MUST wrap around, just like modulo

    if(Q->Head == Q->Tail)      {
        //  Q->IsEmpty = 1;
        return QUEUE_EMPTY_FLAG; // queue Empty - nothing to get
    }
    return QUEUE_OK; // No errors
}
//
//  ---------------------------------------------------------------------------
int _tmain(int argc, _TCHAR* argv[])    {
//  constrain buffer size to some power of 2 to force faux modulo arithmetic
    int LoopKnt = 1000000;  //  for benchmarking purposes only
    int k, i=0, Qview=0;
    time_t start;
    QUEUE_DESC Queue, *Q;
    if(NULL == (Q = Q_Init(&Queue, BITS_ELE_KNT, sizeof(int)))) {
        printf("\nProgram failed to initialize. Aborting.\n\n");
        return 0;
    }

    start = clock();
    for(k=0; k<LoopKnt; k++)    {
        //printf("\n\n Fill'er up please...\n");
        //Q->Head = Q->Tail = rand();
        for(i=1; i<= Q->EleKnt; i++)    {
            Qview = i*i;
            if(QUEUE_FULL_FLAG == Q_Put(Q, (UCHAR *)&Qview))    {
                //printf("\nQueue is full at %i \n", i);
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //  Get data from queue until completely drained (empty)
        //
        //printf("\n\n Step into the lab, and see what's on the slab... \n");
        Qview = 0;
        for(i=1; i; i++)    {
            if(QUEUE_EMPTY_FLAG == Q_Get((UCHAR *)&Qview, Q))   {
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                //printf("\nQueue is empty at %i", i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    }
    printf("\nQueue time was %5.3f to fill & drain %i element queue  %i times \n", 
                     (dbl)(clock()-start)/(dbl)CLOCKS_PER_SEC,Q->EleKnt, LoopKnt);
    printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    getchar();
    return 0;
}

Вот простое решение на C.Предположим, прерывания отключены для каждой функции.Никакого полиморфизма и прочего, только здравый смысл.


#define BUFSIZE 128
char buf[BUFSIZE];
char *pIn, *pOut, *pEnd;
char full;

// init
void buf_init()
{
    pIn = pOut = buf;       // init to any slot in buffer
    pEnd = &buf[BUFSIZE];   // past last valid slot in buffer
    full = 0;               // buffer is empty
}

// add char 'c' to buffer
int buf_put(char c)
{
    if (pIn == pOut  &&  full)
        return 0;           // buffer overrun

    *pIn++ = c;             // insert c into buffer
    if (pIn >= pEnd)        // end of circular buffer?
        pIn = buf;          // wrap around

    if (pIn == pOut)        // did we run into the output ptr?
        full = 1;           // can't add any more data into buffer
    return 1;               // all OK
}

// get a char from circular buffer
int buf_get(char *pc)
{
    if (pIn == pOut  &&  !full)
        return 0;           // buffer empty  FAIL

    *pc = *pOut++;              // pick up next char to be returned
    if (pOut >= pEnd)       // end of circular buffer?
        pOut = buf;         // wrap around

    full = 0;               // there is at least 1 slot
    return 1;               // *pc has the data to be returned
}

Простая реализация может состоять из:

  • Буфер, реализованный как массив размера n любого типа, который вам нужен.
  • Указатель чтения или индекс (в зависимости от того, что более эффективно для вашего процессора)
  • Указатель записи или индекс
  • Счетчик, показывающий, сколько данных находится в буфере (полученный из указателей чтения и записи, но быстрее отслеживать его отдельно)

Каждый раз, когда вы записываете данные, вы перемещаете указатель записи и увеличиваете счетчик.Когда вы читаете данные, вы увеличиваете указатель чтения и уменьшаете счетчик.Если любой указатель достигает n, установите его в ноль.

Вы не можете писать, если счетчик = n.Вы не можете читать, если счетчик = 0.

Стиль C, простой кольцевой буфер для целых чисел.Сначала используйте init, а затем используйте put и get.Если буфер не содержит никаких данных, он возвращает «0» ноль.

//=====================================
// ring buffer address based
//=====================================
#define cRingBufCount   512
int     sRingBuf[cRingBufCount];    // Ring Buffer
int     sRingBufPut;                // Input index address
int     sRingBufGet;                // Output index address
Bool    sRingOverWrite;

void    GetRingBufCount(void)
{
int     r;
`       r= sRingBufPut - sRingBufGet;
        if ( r < cRingBufCount ) r+= cRingBufCount;
        return r; 
}

void    InitRingBuffer(void)
{
        sRingBufPut= 0;
        sRingBufGet= 0;
}       

void    PutRingBuffer(int d)
{
        sRingBuffer[sRingBufPut]= d;
        if (sRingBufPut==sRingBufGet)// both address are like ziro
        {
            sRingBufPut= IncRingBufferPointer(sRingBufPut);
            sRingBufGet= IncRingBufferPointer(sRingBufGet);
        }
        else //Put over write a data
        {
            sRingBufPut= IncRingBufferPointer(sRingBufPut);
            if (sRingBufPut==sRingBufGet)
            {
                sRingOverWrite= Ture;
                sRingBufGet= IncRingBufferPointer(sRingBufGet);
            }
        }
}

int     GetRingBuffer(void)
{
int     r;
        if (sRingBufGet==sRingBufPut) return 0;
        r= sRingBuf[sRingBufGet];
        sRingBufGet= IncRingBufferPointer(sRingBufGet);
        sRingOverWrite=False;
        return r;
}

int     IncRingBufferPointer(int a)
{
        a+= 1;
        if (a>= cRingBufCount) a= 0;
        return a;
}
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top