Domanda

I have an application at $work where I have to move between two real-time threads that are scheduled at different frequencies. (The actual scheduling is beyond my control.) The application is hard real-time-ish (one of the threads has to drive a hardware interface), so the data transfer between the threads should be lock-free and wait-free to the extent possible.

It is important to note that only one block of data needs to be transferred: because the two threads run at different rates, there will be times when two iterations of the faster thread are completed between two wakeups of the slower thread; in this case it is OK to overwrite the data in the write buffer so that the slower thread gets only the latest data.

In other words, instead of a queue a double buffered solution suffices. The two buffers are allocated during initialization, and the reader and write threads can call methods of the class to get pointers to one of these buffers.

C++ code:

#include <mutex>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() {
        m_write_busy = false;
        m_read_idx = m_write_idx = 0;
    }

    ~ProducerConsumerDoubleBuffer() { }

    // The writer thread using this class must call
    // start_writing() at the start of its iteration
    // before doing anything else to get the pointer
    // to the current write buffer.
    T * start_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = true;
        m_write_idx = 1 - m_read_idx;

        return &m_buf[m_write_idx];
    }
    // The writer thread must call end_writing()
    // as the last thing it does
    // to release the write busy flag.
    void end_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = false;
    }

    // The reader thread must call start_reading()
    // at the start of its iteration to get the pointer
    // to the current read buffer.
    // If the write thread is not active at this time,
    // the read buffer pointer will be set to the 
    // (previous) write buffer - so the reader gets the latest data.
    // If the write buffer is busy, the read pointer is not changed.
    // In this case the read buffer may contain stale data,
    // it is up to the user to deal with this case.
    T * start_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        if (!m_write_busy) {
            m_read_idx = m_write_idx;
        }

        return &m_buf[m_read_idx];
    }
    // The reader thread must call end_reading()
    // at the end of its iteration.
    void end_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_read_idx = m_write_idx;
    }

private:
    T m_buf[2];
    bool m_write_busy;
    unsigned int m_read_idx, m_write_idx;
    std::mutex m_mutex;
};

To avoid stale data in the reader thread the payload structure is versioned. To facilitate bidirectional data transfer between the threads, two instances of the above monstrosity are used, in opposite directions.

Questions:

  • Is this scheme threadsafe? If it's broken, where?
  • Can it be done without the mutex? Perhaps with just memory barriers or CAS instructions?
  • Can it be made better?
È stato utile?

Soluzione

Very interesting problem! Way trickier than I first thought :-) I like lock-free solutions, so I've tried to work one out below.

There are many ways to think about this system. You can model it as a fixed-size circular buffer/queue (with two entries), but then you lose the ability to update the next available value for consumption, since you don't know if the consumer has started to read the most recently published value or is still (potentially) reading the previous one. So extra state is needed beyond that of a standard ring buffer in order to reach a more optimal solution.

First note that there is always a cell that the producer can safely write to at any given point in time; if one cell is being read by the consumer, the other can be written to. Let's call the cell that can be safely written to the "active" cell (the cell that can be potentially read from is whatever cell isn't the active one). The active cell can only be switched if the other cell is not currently being read from.

Unlike the active cell, which can always be written to, the non-active cell can only be read from if it contains a value; once that value is consumed, it's gone. (This means that livelock is avoided in the case of an aggressive producer; at some point, the consumer will have emptied a cell and will stop touching the cells. Once that happens, the producer can definitely publish a value, whereas before that point, it can only publish a value (change the active cell) if the consumer is not in the middle of a read.)

If there is a value that's ready to be consumed, only the consumer can change that fact (for the non-active cell, anyway); subsequent productions may change which cell is active and the published value, but a value will always be ready to be read until it's consumed.

Once the producer is done writing to the active cell, it can "publish" this value by changing which cell is the active one (swapping the index), provided the consumer is not in the middle of reading the other cell. If the consumer is in the middle of reading the other cell, the swap cannot occur, but in that case the consumer can swap after it's done reading the value, provided the producer is not in the middle of a write (and if it is, the producer will swap once it's done). In fact, in general the consumer can always swap after it's done reading (if it's the only one accessing the system) because spurious swaps by the consumer are benign: if there is something in the other cell, then swapping will cause that to be read next, and if there isn't, swapping affects nothing.

So, we need a shared variable to track what the active cell is, and we also need a way for both the producer and consumer to indicate if they're in the middle of an operation. We can store these three pieces of state into one atomic variable in order to be able to affect them all at once (atomically). We also need a way for the consumer to check if there's anything in the non-active cell in the first place, and for both threads to modify that state as appropriate. I tried a few other approaches, but in the end the easiest was just to include this information in the other atomic variable too. This makes things much simpler to reason about, since all state changes in the system are atomic this way.

I've come up with a wait-free implementation (lock-free, and all operations complete in a bounded number of instructions).

Code time!

#include <atomic>
#include <cstdint>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() : m_state(0) { }
    ~ProducerConsumerDoubleBuffer() { }

    // Never returns nullptr
    T* start_writing() {
        // Increment active users; once we do this, no one
        // can swap the active cell on us until we're done
        auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
        return &m_buf[state & 1];
    }

    void end_writing() {
        // We want to swap the active cell, but only if we were the last
        // ones concurrently accessing the data (otherwise the consumer
        // will do it for us when *it's* done accessing the data)

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
        state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
        if ((state & 0x6) == 0) {
            // The consumer wasn't in the middle of a read, we should
            // swap (unless the consumer has since started a read or
            // already swapped or read a value and is about to swap).
            // If we swap, we also want to clear the full flag on what
            // will become the active cell, otherwise the consumer could
            // eventually read two values out of order (it reads a new
            // value, then swaps and reads the old value while the
            // producer is idle).
            m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
        }
    }

    // Returns nullptr if there appears to be no more data to read yet
    T* start_reading() {
        m_readState = m_state.load(std::memory_order_relaxed);
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // Nothing to read here!
            return nullptr;
        }

        // At this point, there is guaranteed to be something to
        // read, because the full flag is never turned off by the
        // producer thread once it's on; the only thing that could
        // happen is that the active cell changes, but that can
        // only happen after the producer wrote a value into it,
        // in which case there's still a value to read, just in a
        // different cell.

        m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;

        // Now that we've incremented the user count, nobody can swap until
        // we decrement it
        return &m_buf[(m_readState & 1) ^ 1];
    }

    void end_reading() {
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // There was nothing to read; shame to repeat this
            // check, but if these functions are inlined it might
            // not matter. Otherwise the API could be changed.
            // Or just don't call this method if start_reading()
            // returns nullptr -- then you could also get rid
            // of m_readState.
            return;
        }

        // Alright, at this point the active cell cannot change on
        // us, but the active cell's flag could change and the user
        // count could change. We want to release our user count
        // and remove the flag on the value we read.

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
        state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
        if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
            // Oi, we were the last ones accessing the data when we released our cell.
            // That means we should swap, but only if the producer isn't in the middle
            // of producing something, and hasn't already swapped, and hasn't already
            // set the flag we just reset (which would mean they swapped an even number
            // of times).  Note that we don't bother swapping if there's nothing to read
            // in the other cell.
            m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
        }
    }

private:
    T m_buf[2];

    // The bottom (lowest) bit will be the active cell (the one for writing).
    // The active cell can only be switched if there's at most one concurrent
    // user. The next two bits of state will be the number of concurrent users.
    // The fourth bit indicates if there's a value available for reading
    // in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
    std::atomic<std::uint32_t> m_state;

    std::uint32_t m_readState;
};

Note that the semantics are such that the consumer can never read a given value twice, and a value it does read is always newer than the last value it read. It's also fairly efficient in memory usage (two buffers, like your original solution). I avoided CAS loops because they're generally less efficient than a single atomic operation under contention.

If you decide use the above code, I suggest you write some comprehensive (threaded) unit tests for it first. And proper benchmarks. I did test it, but only just barely. Let me know if you find any bugs :-)

My unit test:

ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_writing();
        if (item != nullptr) {      // Always true
            *item = i;
        }
        buf.end_writing();
    }
});
std::thread consumer([&]() {
    int prev = -1;
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_reading();
        if (item != nullptr) {
            assert(*item > prev);
            prev = *item;
        }
        buf.end_reading();
    }
});
producer.join();
consumer.join();

As for your original implementation, I only looked at it cursorily (it's much more fun to design new stuff, heh), but david.pfx's answer seems to address that part of your question.

Altri suggerimenti

Yes, I think it's broken.

If the reader does a start/end/start in succession it will update its read index to the write index, and potentially read data from the write index, even if the write is busy.

The problem essentially is that the writer doesn't know which buffer the reader will use, so the writer should ensure that both buffers are valid at all times. It can't do that, if it's going to take any time to write data into a buffer [unless I misunderstood some of the logic that isn't shown here.]

Yes, I think it can be done without locks, using CAS or equivalent logic. I'm not going to try to express an algorithm in this space. I'm confident that it exists, but not that I can write it out correctly first time. And a bit of web searching turned up some plausible candidates. Wait-free IPC using CAS appears to be quite an interesting topic and the subject of some research.


After some further thought, the algorithm is as follows. You need:

  • 3 buffers: one for the writer, one for the reader to use, and one extra. The buffers are ordered: they form a ring (but see note).
  • A status for each buffer: free, full, writing, reading.
  • A function that can inspect the status of the buffer and conditionally change the status to a different value in a single atomic operation. I shall use CSET for that.

Writer:

Find the first buffer that is FREE or FULL
  Fail: assert (should never fail, reader can only use one buffer)
  CSET buffer to WRITING
Write into the buffer
CSET buffer to FULL

Reader:

Find first buffer that is FULL
    Fail: wait (writer may be slow)
    CSET buffer to READING
Read and consume buffer
CSET buffer to FREE

Note: This algorithm does not guarantee that buffers are treated strictly in order of arrival, and no simple change will make it do so. If this is important, the algorithm should be enhanced with a sequence number on the buffer, set by the writer so that the most recent buffer can be chosen by the reader.

I leave the code as an implementation detail.


The CSET function is non-trivial. It has to atomically test that a particular shared memory location is equal to an expected value and if so change it to a new value. It returns true if it successfully made the change and false otherwise. The implementation must avoid race conditions if two threads access the same location at the same time (and possibly on different processors).

The C++ standard atomic operations library contains a set of atomic_compare_exchange functions which should serve the purpose, if available.

Here a version using InterlockedExchangePointer() and SLISTs.

This solution does not support re-reading of last buffer. But if that is needed it can be done on reader side by means of a copy and a if( NULL == doubleBuffer.beginReader(...) ) { use backup copy ... }.
This is not done because it is hard to add, but because it is not very realistic. Imagine your last known value turns older and older - seconds, days, weeks. It is unlikely that the application will still want to use it. So, factoring re-read functionality into the double buffer code takes away flexibility from the application.

The double buffer has 1 read pointer member. Whenever beginRead() is called, this value is returned and atomically replaced with NULL. Think of it as "The reader TAKES the buffer."
With endRead(), the reader returns the buffer and it is added to the SLIST, containing the available buffers for write operations.

Initially, both buffers are added to the SLIST, the read pointer is NULL.

beginWrite() pops the next available buffer from the SLIST. And this value cannot ever be NULL, because of the way endWrite() is implemented.

Last not least, endWrite() atomically swaps the read pointer with the returned, freshly written buffer and if the read pointer was not NULL, it pushes it onto the SLIST.

So, even if the reader side never reads, the writer side never runs out of buffers. When the reader reads, it gets the latest known value (once!).

What this implementation is not safe against is if there are multiple concurrent readers or writers. But that was not the objective in the first place.

On the ugly side, the Buffers need to be structures with some SLIST_HEADER member on top.

Here, the code, but keep in mind that it is not my fault if your mars rover lands on venus!

const size_t MAX_DATA_SIZE = 512;
typedef
//__declspec(align(MEMORY_ALLOCATION_ALIGNMENT))
struct DataItem_tag
{
    SLIST_ENTRY listNode;
    uint8_t data[MAX_DATA_SIZE];
    size_t length;
} DataItem_t;

class CDoubleBuffer
{
    SLIST_HEADER m_writePointers;
    DataItem_t m_buffers[2];
    volatile DataItem_t *m_readPointer;

public:
    CDoubleBuffer()
        : m_writePointers()
        , m_buffers()
        , m_readPointer(NULL)
    {
        InitializeSListHead(&m_writePointers);
        InterlockedPushEntrySList(&m_writePointers, &m_buffers[0].listNode);
        InterlockedPushEntrySList(&m_writePointers, &m_buffers[1].listNode);
    }
    DataItem_t *beginRead()
    {
        DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, NULL));
        return result;
    }
    void endRead(DataItem_t *dataItem)
    {
        if (NULL != dataItem)
        {
            InterlockedPushEntrySList(&m_writePointers, &dataItem->listNode);
        }
    }
    DataItem_t *beginWrite()
    {
        DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedPopEntrySList(&m_writePointers));
        return result;
    }
    void endWrite(DataItem_t *dataItem)
    {
        DataItem_t *oldReadPointer = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, dataItem));
        if (NULL != oldReadPointer)
        {
            InterlockedPushEntrySList(&m_writePointers, &oldReadPointer->listNode);
        }
    }
};

And here the test code for it. (For both, the above and the test code you need <windows.h> and <assert.h>.)

CDoubleBuffer doubleBuffer;

DataItem_t *readValue;
DataItem_t *writeValue;

// nothing to read yet. Make sure NULL is returned.
assert(NULL == doubleBuffer.beginRead());
doubleBuffer.endRead(NULL); // we got nothing, we return nothing.

// First write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 0;
doubleBuffer.endWrite(writeValue);

// Second write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 1;
doubleBuffer.endWrite(writeValue);

// Third write without read - works because it reuses the old buffer for the new write.
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 2;
doubleBuffer.endWrite(writeValue);

readValue = doubleBuffer.beginRead();
assert(NULL != readValue); // NULL would obviously be a terrible bug.
assert(2 == readValue->length); // We got the latest and greatest?
doubleBuffer.endRead(readValue);

readValue = doubleBuffer.beginRead();
assert(NULL == readValue); // We expect NULL here. Re-reading is not a feature of this implementation!
doubleBuffer.endRead(readValue);
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top