Pregunta

I am just looking for feedback (obvious flaws/ways to improve it) on my attempt to implement atomic read/writes on a structure.

There will be one writing thread and multiple reading threads. The aim is to prevent a reader from getting an inconsistent view of the structure whilst not impeding the writer too much.

I am using the fetch-and-add atomic primitive, in this case provided by the Qt framework.

For example:

/* global */
OneWriterAtomicState<Point> atomicState;

/* Writer */
while(true) {  
  MyStruct s = atomicState.getState()
  s.x += 2; s.y += 2;
  atomicState.setState(s);
}

/* Reader */
while(true) {  
  MyStruct s = atomicState.getState()
  drawBox(s.x,s.y);
}

OneWriterAtomicState Implementation:

template <class T>
class OneWriterAtomicState
{
public:
    OneWriterAtomicState()
        : seqNumber(0)
    {
    }

    void setState(T& state) {
        this->seqNumber.fetchAndAddOrdered(1);
        this->state = state;
        this->seqNumber.fetchAndAddOrdered(1);
    }

    T getState(){
        T result;
        int seq;
        bool seq_changed = true;

        /* repeat while seq is ODD or if seq changes during read operation */
        while( (seq=this->seqNumber.fetchAndAddOrdered(0)) & 0x01 || seq_changed ) {
            result = this->state;
            seq_changed = (this->seqNumber.fetchAndAddOrdered(0)!=seq);
        }
        return result;
    }


private:
    QAtomicInt seqNumber;
    T state;
} 

Here is version two (memcpy, reader yielding, hopefully fixed getState() ):

template <class T>
class OneWriterAtomicState
{

public:
    OneWriterAtomicState()
        : seqNumber(0)
    {
        /* Force a compile-time error if T is NOT a type we can copy with memcpy */
        Q_STATIC_ASSERT(!QTypeInfo<T>::isStatic);
    }

    void setState(T* state) {
        this->seqNumber.fetchAndAddOrdered(1);
        memcpy(&this->state,state,sizeof(T));
        this->seqNumber.fetchAndAddOrdered(1);
    }

    void getState(T* result){
        int seq_before;
        int seq_after  = this->seqNumber.fetchAndAddOrdered(0);
        bool seq_changed = true;
        bool firstIteration = true;

        /* repeat while seq_before is ODD or if seq changes during read operation */
        while( ((seq_before=seq_after) & 0x01) || seq_changed ) {

            /* Dont want to yield on first attempt */
            if(!firstIteration) {
                /* Give the writer a chance to finish */
                QThread::yieldCurrentThread();
            } else firstIteration = false;

            memcpy(result,&this->state,sizeof(T));
            seq_after = this->seqNumber.fetchAndAddOrdered(0);
            seq_changed = (seq_before!=seq_after);
        }
    }

    bool isInitialized() {  return (seqNumber>0); }

private:
    QAtomicInt seqNumber;
    T state;
} ;

#endif // ONEWRITERATOMICSTATE_H
¿Fue útil?

Solución

The algorithm is not quite correct. Here's one possible interleaving of threads where the reader gets inconsistent data:

state initialized to {0,0} and seqNumber to 0

Writer:
seqNumber = 1;
state.x = 1;

Reader:
seq = seqNumber; //1
result = state; //{1,0}
seq_changed = (seqNumber != seq); //false

Writer:
state.y = 1;
seqNumber = 2;

Reader:
jumps back to the start of the loop
seq = seqNumber; //2
steps out of the loop because seq == 2 and seq_changed == false

So the problem is that seqNumber is read in two places and it's possible for the writer to update the value between the reads.

while( (seq=this->seqNumber.fetchAndAddOrdered(0)) & 0x01 || seq_changed ) {
    result = this->state;
    seq_changed = (this->seqNumber.fetchAndAddOrdered(0)!=seq);
    //If writer updates seqNumber here to even number bad things may happen
}

It should only be read once per iteration:

T getState(){
    T result;
    int seq;
    int newseq = seqNumber.fetchAndAddOrdered(0);
    bool seq_changed = true;

    while( (seq = newseq) & 0x01 || seq_changed ) {
        result = state;
        newseq = seqNumber.fetchAndAddOrdered(0);
        seq_changed = (newseq != seq);
    }
    return result;
}

I believe this should work correctly, but I won't guarantee anything. :) At the very least you should write a test program, like the one in your example but adding a check for inconsistent values in the reader.

One thing worth considering is that using atomic increment (fetchAndAdd) is kind of an overkill. There's only one thread writing seqNumber, so you could do with simple atomic store-release and load-acquire operations, and they can be implemented much faster on many processors. However I don't know if those operations are possible with QAtomicInt; the documentation is very unclear about it.

edit: and wilx is right, T needs to be a trivially copyable type

Otros consejos

I think that this will only work if T's copy assignment operator is primitive and does basically only a bitwise copy. For any more complicated T you could end up getting inconsistent state during the execution of the result = this->state;.

So, I would suggest using some kind of rwlocks with writer preference.

If you have a priority based thread scheduling and the reader has a higher priority than the writer you might run into a livelock. Imagine the writer starts to write the value and then the reader comes in doing its active waiting. Due to the higher priority of the reader the writer will never get a chance finish the writing.

A solution would be to add a tiny delay to the waiting loop.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top