Question

I have written a synchronised queue for holding integers and am faced with a weird race condition which I cannot seem to be able to understand.

Please do NOT post solutions, I know how to fix the code and make it work, I want to know what the race condition is and why it is not working as intended. Please help me understand what is going wrong and why.

First the important part of the code:

This assumes that the application will never put in more then the buffer can hold, thus no check for the current buffer size

static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value ) {
    if (value) { // 0 values are not allowed to be put in
        size_t write_offset; // holds a current copy of the array index where to put the element
        for (;;) {
            // retrieve up to date write_offset copy and apply power-of-two modulus
            write_offset = int_queue->write_offset & int_queue->modulus; 
            // if that cell currently holds 0 (thus is empty)
            if (!int_queue->int_container[write_offset])
                // Appetmt to compare and swap the new value in
                if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value))
                    // if successful then this thread was the first do do this, terminate the loop, else try again
                    break;
        }

        // increment write offset signaling other threads where the next free cell is
        int_queue->write_offset++;
        // doing a synchronised increment here does not fix the race condition
    }
}

This seems to have a rare race condition which seems to not increment the write_offset. Tested on OS X gcc 4.2, Intel Core i5 quadcore and Linux Intel C Compiler 12 on RedHat 2.6.32 Intel(R) Xeon(R). Both produce race conditions.

Full source with test cases:

#include <pthread.h>

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <stdint.h>

// #include "int_queue.h"
#include <stddef.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>

#ifndef INT_QUEUE_H
#define INT_QUEUE_H

#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
#endif

struct int_queue_s {
    size_t size;
    size_t modulus;
    volatile size_t read_offset;
    volatile size_t write_offset;
    volatile long int int_container[0];
};

static inline void int_queue_put(struct int_queue_s * const __restrict int_queue, const long int value ) {
    if (value) {
        int_queue->int_container[int_queue->write_offset & int_queue->modulus] = value;
        int_queue->write_offset++;
    }
}

static inline void int_queue_put_sync(struct int_queue_s * const __restrict int_queue, const long int value ) {
    if (value) {
        size_t write_offset;
        for (;;) {
            write_offset = int_queue->write_offset & int_queue->modulus;
            if (!int_queue->int_container[write_offset])
                if (__sync_bool_compare_and_swap(&(int_queue->int_container[write_offset]), (long int)0, value))
                    break;
        }

        int_queue->write_offset++;
    }
}

static inline long int int_queue_get(struct int_queue_s * const __restrict int_queue) {
    size_t read_offset = int_queue->read_offset & int_queue->modulus;
    if (int_queue->write_offset != int_queue->read_offset) {
        const long int value = int_queue->int_container[read_offset];
        int_queue->int_container[read_offset] = 0;
        int_queue->read_offset++;
        return value;
    } else
        return 0;
}

static inline long int int_queue_get_sync(struct int_queue_s * const __restrict int_queue) {
    size_t read_offset;
    long int volatile value;
    for (;;) {

        read_offset = int_queue->read_offset;
        if (int_queue->write_offset == read_offset)
            return 0;
        read_offset &= int_queue->modulus;
        value = int_queue->int_container[read_offset];
        if (value)
            if (__sync_bool_compare_and_swap(&(int_queue->int_container[read_offset]), (long int)value, (long int)0))
                break;
    }
    int_queue->read_offset++;
    return value;
}

static inline struct int_queue_s * int_queue_create(size_t num_values) {

    struct int_queue_s * int_queue;
    size_t modulus;
    size_t temp = num_values + 1;
    do {
        modulus = temp;
        temp--;
        temp &= modulus;
    } while (temp);
    modulus <<= 1;

    size_t int_queue_mem = sizeof(*int_queue) + ( sizeof(int_queue->int_container[0]) * modulus);

    if (int_queue_mem % sysconf(_SC_PAGE_SIZE)) int_queue_mem += sysconf(_SC_PAGE_SIZE) - (int_queue_mem % sysconf(_SC_PAGE_SIZE));

    int_queue = mmap(NULL, int_queue_mem, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE , -1, 0);
    if (int_queue == MAP_FAILED)
        return NULL;

    int_queue->modulus = modulus-1;
    int_queue->read_offset = 0;
    int_queue->write_offset = 0;
    int_queue->size = num_values;

    memset((void*)int_queue->int_container, 0, sizeof(int_queue->int_container[0]) * modulus);

    size_t i;
    for (i = 0; i < num_values; ) {
        int_queue_put(int_queue, ++i );
    }

    return int_queue;
}


#endif


void * test_int_queue_thread(struct int_queue_s * int_queue) {
    long int value;

    size_t i;

    for (i = 0; i < 10000000; i++) {


        int waited = -1;
        do {
            value = int_queue_get_sync(int_queue);
            waited++;
        } while (!value);

        if (waited > 0) {
            printf("waited %d cycles to get a new value\n", waited);
            // continue;
        }

        // else {
        printf("thread %p got value %ld, i = %zu\n", (void *)pthread_self(), value, i);
        // }

        int timesleep = rand();
        timesleep &= 0xFFF;

        usleep(timesleep);

        int_queue_put_sync(int_queue, value);

        printf("thread %p put value %ld back, i = %zu\n", (void *)pthread_self(), value, i);
    }

    return NULL;
}


int main(int argc, char ** argv) {
    struct int_queue_s * int_queue = int_queue_create(2);

    if (!int_queue) {
        fprintf(stderr, "error initializing int_queue\n");
        return -1;
    }

    srand(0);

    long int value[100];

    size_t i;

    for (i = 0; i < 100; i++) {
        value[0] = int_queue_get(int_queue);

        if (!value[0]) {
            printf("error getting value\n");
        }
        else {
            printf("got value %ld\n", value[0]);
        }

        int_queue_put(int_queue, value[0]);

        printf("put value %ld back successfully\n", value[0]);
    }

    pthread_t threads[100];
    for (i = 0; i < 4; i++) {
        pthread_create(threads + i, NULL, (void * (*)(void *))test_int_queue_thread, int_queue);
    } 

    for (i = 0; i < 4; i++) {
        pthread_join(threads[i], NULL);
    } 


    return 0;
}
Was it helpful?

Solution

Interesting question. Here is a wild guess. :-)

It seems you need some synchronization between your read_offset and write_offset.

For example, here is a race that may be related or not. Between your compare-and-swap and the write_offset increment you may have a reader come in and set the value back to zero.

Writer-1: get write_offset=0
Writer-2: get write_offset=0
Writer-1: compare-and-swap at offset=0
Writer-1: Set write_offset=1
Reader-1: compare-and-swap at offset=0 (sets it back to zero)
Writer-2: compare-and-swap at offset=0 again even though write_offset=1
Writer-2: Set write_offset=2

OTHER TIPS

I believe that int_queue->write_offset++; is the problem: if two threads execute this instruction simultaneously, they will both load the same value from memory, increment it, and store the same result back (such that the variable only increases by one).

my opinion is

int_queue->write_offset++;

and

write_offset = int_queue->write_offset & int_queue->modulus; 

are not thread safe

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top