Pergunta

Eu estou tentando encontrar uma maneira de fazer um Bloqueio Livre OU Não-bloqueio maneira de fazer um Buffer em Anel por um único consumidor / consumidor único que irá gravar os dados mais antigos int buffer.Eu li um monte de lock-free algoritmos que funcionam quando você "return false" se o buffer está cheio--ou seja, não adicionar;mas eu não posso encontrar até mesmo pseudo-código, que fala sobre como fazê-lo quando você precisa substituir os dados mais antigos.

Eu estou usando o GCC 4.1.2 (restrição no trabalho, eu não consigo atualizar a versão...) e eu tenho o Impulso de bibliotecas, e no passado eu fiz o meu próprio Atômica< T > tipo de variável que segue muito de perto as próximas especificação (não é perfeito, mas é thread-safe e faz o que eu preciso).

Quando eu pensei sobre isso, eu percebi que usando estas atômicos realmente deve cuidar do problema.alguns áspero psuedo-código para o que eu estava pensando:

template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;

unsigned int getNextIndex(unsigned int i)
{
 return (i + 1 ) % size;
}

public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
 if(writeIndex == getNextIndex(readIndex)) //1
 {
  readIndex = getNextIndex(readIndex); //2
  }
  buf[writeIndex] = t;
  writeIndex = getNextIndex(writeIndex);  //3
}

bool consume(T& t)
{
  if(readIndex == writeIndex)  //4
   return false;
  t = buf[readIndex];  
  readIndex = getNexIndex(readIndex);  //5
  return true;
}

};

Tanto quanto eu posso dizer, não há situações de impasse aqui, então estamos seguros de que (Se a minha implementação acima é errado, mesmo em sua pseudo-código, leve, a crítica construtiva é sempre bem-vindo).No entanto,a GRANDE condição de corrida que eu encontrei foi:

vamos assumir que o buffer está cheio.que é, writeIndex +1 = readIndex;(1) ocorre, assim como consumir está sendo chamado.e é verdade (4) é falso, então, passamos a ler do buffer (5) ocorre, e o readIndex é avançada (portanto, há, de fato, o espaço em buffer (2) ocorre, avançando readIndex NOVAMENTE, assim, PERDER o valor.

Basicamente, é um clássico problema do writter deve modificar o leitor, fazendo com que uma condição de corrida.Sem, na verdade, o bloqueio de toda a lista toda vez que eu acesso ele, eu não posso pensar de uma maneira para evitar que isso aconteça.O que eu estou ausente??

Foi útil?

Solução

  1. Comece com um único produtor/fila de consumidores múltiplos com garantias de progresso apropriadas.
  2. Se a fila estiver cheia e o impulso falharia, apareça um valor. Depois, haverá espaço para aumentar o novo valor.

Outras dicas

O que estou perdendo??

Grande quantidade:

  • Digamos que você consuma enquanto está sendo substituído pelo produtor - como você está detectando/lidando com isso?
    • muitas opções - por exemplo, do { copie o valor; A cópia da verificação tem integridade usando a sequência de modificação NUM etc. } while (corrupto)
  • Usar números atômicos não é suficiente - você também precisa usar loops no estilo CAS para afetar os incrementos do índice (embora eu assuma que você sabe que, já que você diz que já leu extensivamente nisso)
  • barreiras de memória

Mas, vamos escrever isso como estar abaixo do seu nível de pseudo-código e considerar sua pergunta explícita:

  • O ponto (5) exigirá uma operação CAS. Se o ReadIndex foi amostrado/copiado corretamente no topo consume() - antes do (possivelmente corrupto) t foi copiado - então a instrução CAS falhará se já foi incrementado pelo produtor. Em vez da reamosa usual e tente novamente CAS, continue.

Aqui está um código de buffer circular atômica variáveis de eu ter criado recentemente.Eu modifiquei para "substituir" os dados em vez de retornar false.Aviso de isenção - não é o grau de produção testado ainda.

    template<int capacity, int gap, typename T> class nonblockigcircular {
  /*
   * capacity - size of cicular buffer
   * gap - minimum safety distance between head and tail to start insertion operation
   *       generally gap should exceed number of threads attempting insertion concurrently 
   *       capacity should be gap plus desired buffer size 
   * T   - is a data type for buffer to keep
   */
  volatile T buf[capacity];  // buffer

  std::atomic<int> t, h, ph, wh; 
  /* t to h data available for reading
   * h to ph - zone where data is likely written but h is not updated yet
   *   to make sure data is written check if ph==wh 
   * ph to wh - zone where data changes in progress 
   */

  bool pop(T &pwk) {
    int td, tnd;

    do {
      int hd=h.load()%capacity;
      td=t.load()%capacity;
      if(hd==td) return false;
      tnd=(td+1)%capacity;
    } while(!t.compare_exchange_weak(td, tnd));

    pwk=buf[td];
    return true;
  }


  const int  count() {
    return ( h.load()+capacity-t.load() ) % capacity;
    }

  bool push(const T &pwk) {
    const int tt=t.load();
    int hd=h.load();

    if(  capacity - (hd+capacity-tt) % capacity < gap) {
       // Buffer is too full to insert
       // return false; 
       // or delete last record as below
       int nt=t.fetch_add(1);
       if(nt==capacity-1) t.fetch_sub(capacity);
       }


    int nwh=wh.fetch_add(1);
    if(nwh==capacity-1) wh.fetch_sub(capacity);

    buf[nwh%capacity]=pwk;

    int nph=ph.fetch_add(1);
    if(nph==capacity-1) ph.fetch_sub(capacity);

    if(nwh==nph) {
      int ohd=hd;
      while(! h.compare_exchange_weak(hd, nwh) ) {
        hd=h.load();
        if( (ohd+capacity-hd) % capacity > (ohd+capacity-nwh) % capacity ) break;
      }
    }
    return true;
  }

};
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top