Вопрос

I'm trying to implement a high performance blocking queue backed by a circular buffer on top of pthreads, semaphore.h and gcc atomic builtins. The queue needs to handle multiple simulataneous readers and writers from different threads.

I've isolated some sort of race condition, and I'm not sure if it's a faulty assumption about the behavior of some of the atomic operations and semaphores, or whether my design is fundamentally flawed.

I've extracted and simplified it to the below standalone example. I would expect that this program never returns. It does however return after a few hundred thousand iterations with corruption detected in the queue.

In the below example (for exposition) it doesn't actually store anything, it just sets to 1 a cell that would hold the actual data, and 0 to represent an empty cell. There is a counting semaphore (vacancies) representing the number of vacant cells, and another counting semaphore (occupants) representing the number of occupied cells.

Writers do the following:

  1. decrement vacancies
  2. atomically get next head index (mod queue size)
  3. write to it
  4. increment occupants

Readers do the opposite:

  1. decrement occupants
  2. atomically get next tail index (mod queue size)
  3. read from it
  4. increment vacancies

I would expect that given the above, precisely one thread can be reading or writing any given cell at one time.

Any ideas about why it doesn't work or debugging strategies appreciated. Code and output below...

#include <stdlib.h>
#include <semaphore.h>
#include <iostream>

using namespace std;

#define QUEUE_CAPACITY 8 // must be power of 2
#define NUM_THREADS 2

struct CountingSemaphore
{
    sem_t m;
    CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); }
    void post() { sem_post(&m); }
    void wait() { sem_wait(&m); }
    ~CountingSemaphore() { sem_destroy(&m); }
};

struct BlockingQueue
{
    unsigned int head; // (head % capacity) is next head position
    unsigned int tail; // (tail % capacity) is next tail position
    CountingSemaphore vacancies; // how many cells are vacant
    CountingSemaphore occupants; // how many cells are occupied

    int cell[QUEUE_CAPACITY];
// (cell[x] == 1) means occupied
// (cell[x] == 0) means vacant

    BlockingQueue() :
        head(0),
        tail(0),
        vacancies(QUEUE_CAPACITY),
        occupants(0)
    {
        for (size_t i = 0; i < QUEUE_CAPACITY; i++)
            cell[i] = 0;
    }

    // put an item in the queue
    void put()
    {
        vacancies.wait();

        // atomic post increment
        set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);

        occupants.post();
    }

    // take an item from the queue
    void take()
    {
        occupants.wait();

        // atomic post increment
        get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);

        vacancies.post();
    }

    // set cell i
    void set(unsigned int i)
    {
        // atomic compare and assign
        if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
        {
            corrupt("set", i);
            exit(-1);
        }
    }

    // get cell i
    void get(unsigned int i)
    {
        // atomic compare and assign
        if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
        {
            corrupt("get", i);
            exit(-1);
        }
    }

    // corruption detected
    void corrupt(const char* action, unsigned int i)
    {
        static CountingSemaphore sem(1);
        sem.wait();

        cerr << "corruption detected" << endl;
        cerr << "action = " << action << endl;
        cerr << "i = " << i << endl;
        cerr << "head = " << head << endl;
        cerr << "tail = " << tail << endl;

        for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
            cerr << "cell[" << j << "] = " << cell[j] << endl;
    }
};

BlockingQueue q;

// keep posting to the queue forever
void* Source(void*)
{
    while (true)
        q.put();

    return 0;
}

// keep taking from the queue forever
void* Sink(void*)
{
    while (true)
        q.take();

    return 0;
} 

int main()
{
    pthread_t id;

    // start some pthreads to run Source function
    for (int i = 0; i < NUM_THREADS; i++)
        if (pthread_create(&id, NULL, &Source, 0))
            abort();

    // start some pthreads to run Sink function
    for (int i = 0; i < NUM_THREADS; i++)
        if (pthread_create(&id, NULL, &Sink, 0))
            abort();

    while (true);
}

Compile the above as follows:

    $ g++ -pthread AboveCode.cpp
    $ ./a.out

The output is different every time, but here is one example:

    corruption detected
    action = get
    i = 6
    head = 122685
    tail = 122685
    cell[0] = 0
    cell[1] = 0
    cell[2] = 1
    cell[3] = 0
    cell[4] = 1
    cell[5] = 0
    cell[6] = 1
    cell[7] = 1

My system is Ubuntu 11.10 on Intel Core 2:

    $ uname -a
    Linux 3.0.0-14-generic #23-Ubuntu SMP \
      Mon Nov 21 20:28:43 UTC 2011 x86_64 x86_64 x86_64 GNU/Linux
    $ cat /proc/cpuinfo | grep Intel
    model name : Intel(R) Core(TM)2 Quad  CPU   Q9300  @ 2.50GHz
    $ g++ --version
    g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1

Thanks, Andrew.

Это было полезно?

Решение

One of possible situations, traced step by step for two writer threads (W0, W1) and one reader thread (R0). W0 entered put() earlier than W1, was interrupted by OS or hardware and finished later.

        w0 (core 0)               w1 (core 1)                r0
t0         ----                      ---       blocked on occupants.wait() / take
t1      entered put()                ---                    ---         
t2      vacancies.wait()           entered put()            ---
t3      got new_head = 1           vacancies.wait()         ---
t4     <interrupted by OS>         got new_head = 2         ---
t5                                 written 1 at cell[2]     ---
t6                                 occupants.post();        ---
t7                                 exited put()            waked up
t8                                   ---               got new_tail = 1
t9     <still in interrupt>          ---    read 0 from ceil[1]  !! corruption !!
t10     written 1 at cell[1]                           
t11     occupants.post();
t12     exited put()

Другие советы

From a design point of view, I would consider the whole queue as a shared resource and protect it with a single mutex.

Writers do the following:

  1. take the mutex
  2. write to the queue (including handling of indexes)
  3. free the mutex

Readers do the following:

  1. take the mutex
  2. read from the queue (including handling of indexes)
  3. free the mutex

I have a theory. It's a circular queue so one reading thread may be getting lapped. Say a reader takes index 0. Before it does anything it loses the CPU. Another reader thread takes index 1, then 2, then 3 ... then 7, then 0. The first reader wakes up and both threads think they have exclusive access to index 0. Not sure how to prove it. Hope that helps.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top