C /C++ Lock-free (ou bloqueio) Buffer de Anel que SUBSTITUI os dados mais antigos?
-
25-09-2019 - |
Pergunta
Eu estou tentando encontrar uma maneira de fazer um Bloqueio Livre OU Não-bloqueio maneira de fazer um Buffer em Anel por um único consumidor / consumidor único que irá gravar os dados mais antigos int buffer.Eu li um monte de lock-free algoritmos que funcionam quando você "return false" se o buffer está cheio--ou seja, não adicionar;mas eu não posso encontrar até mesmo pseudo-código, que fala sobre como fazê-lo quando você precisa substituir os dados mais antigos.
Eu estou usando o GCC 4.1.2 (restrição no trabalho, eu não consigo atualizar a versão...) e eu tenho o Impulso de bibliotecas, e no passado eu fiz o meu próprio Atômica< T > tipo de variável que segue muito de perto as próximas especificação (não é perfeito, mas é thread-safe e faz o que eu preciso).
Quando eu pensei sobre isso, eu percebi que usando estas atômicos realmente deve cuidar do problema.alguns áspero psuedo-código para o que eu estava pensando:
template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;
unsigned int getNextIndex(unsigned int i)
{
return (i + 1 ) % size;
}
public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
if(writeIndex == getNextIndex(readIndex)) //1
{
readIndex = getNextIndex(readIndex); //2
}
buf[writeIndex] = t;
writeIndex = getNextIndex(writeIndex); //3
}
bool consume(T& t)
{
if(readIndex == writeIndex) //4
return false;
t = buf[readIndex];
readIndex = getNexIndex(readIndex); //5
return true;
}
};
Tanto quanto eu posso dizer, não há situações de impasse aqui, então estamos seguros de que (Se a minha implementação acima é errado, mesmo em sua pseudo-código, leve, a crítica construtiva é sempre bem-vindo).No entanto,a GRANDE condição de corrida que eu encontrei foi:
vamos assumir que o buffer está cheio.que é, writeIndex +1 = readIndex;(1) ocorre, assim como consumir está sendo chamado.e é verdade (4) é falso, então, passamos a ler do buffer (5) ocorre, e o readIndex é avançada (portanto, há, de fato, o espaço em buffer (2) ocorre, avançando readIndex NOVAMENTE, assim, PERDER o valor.
Basicamente, é um clássico problema do writter deve modificar o leitor, fazendo com que uma condição de corrida.Sem, na verdade, o bloqueio de toda a lista toda vez que eu acesso ele, eu não posso pensar de uma maneira para evitar que isso aconteça.O que eu estou ausente??
Solução
- Comece com um único produtor/fila de consumidores múltiplos com garantias de progresso apropriadas.
- Se a fila estiver cheia e o impulso falharia, apareça um valor. Depois, haverá espaço para aumentar o novo valor.
Outras dicas
O que estou perdendo??
Grande quantidade:
- Digamos que você consuma enquanto está sendo substituído pelo produtor - como você está detectando/lidando com isso?
- muitas opções - por exemplo,
do {
copie o valor; A cópia da verificação tem integridade usando a sequência de modificação NUM etc.} while (
corrupto)
- muitas opções - por exemplo,
- Usar números atômicos não é suficiente - você também precisa usar loops no estilo CAS para afetar os incrementos do índice (embora eu assuma que você sabe que, já que você diz que já leu extensivamente nisso)
- barreiras de memória
Mas, vamos escrever isso como estar abaixo do seu nível de pseudo-código e considerar sua pergunta explícita:
- O ponto (5) exigirá uma operação CAS. Se o ReadIndex foi amostrado/copiado corretamente no topo
consume()
- antes do (possivelmente corrupto)t
foi copiado - então a instrução CAS falhará se já foi incrementado pelo produtor. Em vez da reamosa usual e tente novamente CAS, continue.
Aqui está um código de buffer circular atômica variáveis de eu ter criado recentemente.Eu modifiquei para "substituir" os dados em vez de retornar false.Aviso de isenção - não é o grau de produção testado ainda.
template<int capacity, int gap, typename T> class nonblockigcircular {
/*
* capacity - size of cicular buffer
* gap - minimum safety distance between head and tail to start insertion operation
* generally gap should exceed number of threads attempting insertion concurrently
* capacity should be gap plus desired buffer size
* T - is a data type for buffer to keep
*/
volatile T buf[capacity]; // buffer
std::atomic<int> t, h, ph, wh;
/* t to h data available for reading
* h to ph - zone where data is likely written but h is not updated yet
* to make sure data is written check if ph==wh
* ph to wh - zone where data changes in progress
*/
bool pop(T &pwk) {
int td, tnd;
do {
int hd=h.load()%capacity;
td=t.load()%capacity;
if(hd==td) return false;
tnd=(td+1)%capacity;
} while(!t.compare_exchange_weak(td, tnd));
pwk=buf[td];
return true;
}
const int count() {
return ( h.load()+capacity-t.load() ) % capacity;
}
bool push(const T &pwk) {
const int tt=t.load();
int hd=h.load();
if( capacity - (hd+capacity-tt) % capacity < gap) {
// Buffer is too full to insert
// return false;
// or delete last record as below
int nt=t.fetch_add(1);
if(nt==capacity-1) t.fetch_sub(capacity);
}
int nwh=wh.fetch_add(1);
if(nwh==capacity-1) wh.fetch_sub(capacity);
buf[nwh%capacity]=pwk;
int nph=ph.fetch_add(1);
if(nph==capacity-1) ph.fetch_sub(capacity);
if(nwh==nph) {
int ohd=hd;
while(! h.compare_exchange_weak(hd, nwh) ) {
hd=h.load();
if( (ohd+capacity-hd) % capacity > (ohd+capacity-nwh) % capacity ) break;
}
}
return true;
}
};