Very interesting problem! Way trickier than I first thought :-) I like lock-free solutions, so I've tried to work one out below.
There are many ways to think about this system. You can model it as a fixed-size circular buffer/queue (with two entries), but then you lose the ability to update the next available value for consumption, since you don't know if the consumer has started to read the most recently published value or is still (potentially) reading the previous one. So extra state is needed beyond that of a standard ring buffer in order to reach a more optimal solution.
First note that there is always a cell that the producer can safely write to at any given point in time; if one cell is being read by the consumer, the other can be written to. Let's call the cell that can be safely written to the "active" cell (the cell that can be potentially read from is whatever cell isn't the active one). The active cell can only be switched if the other cell is not currently being read from.
Unlike the active cell, which can always be written to, the non-active cell can only be read from if it contains a value; once that value is consumed, it's gone. (This means that livelock is avoided in the case of an aggressive producer; at some point, the consumer will have emptied a cell and will stop touching the cells. Once that happens, the producer can definitely publish a value, whereas before that point, it can only publish a value (change the active cell) if the consumer is not in the middle of a read.)
If there is a value that's ready to be consumed, only the consumer can change that fact (for the non-active cell, anyway); subsequent productions may change which cell is active and the published value, but a value will always be ready to be read until it's consumed.
Once the producer is done writing to the active cell, it can "publish" this value by changing which cell is the active one (swapping the index), provided the consumer is not in the middle of reading the other cell. If the consumer is in the middle of reading the other cell, the swap cannot occur, but in that case the consumer can swap after it's done reading the value, provided the producer is not in the middle of a write (and if it is, the producer will swap once it's done). In fact, in general the consumer can always swap after it's done reading (if it's the only one accessing the system) because spurious swaps by the consumer are benign: if there is something in the other cell, then swapping will cause that to be read next, and if there isn't, swapping affects nothing.
So, we need a shared variable to track what the active cell is, and we also need a way for both the producer and consumer to indicate if they're in the middle of an operation. We can store these three pieces of state into one atomic variable in order to be able to affect them all at once (atomically). We also need a way for the consumer to check if there's anything in the non-active cell in the first place, and for both threads to modify that state as appropriate. I tried a few other approaches, but in the end the easiest was just to include this information in the other atomic variable too. This makes things much simpler to reason about, since all state changes in the system are atomic this way.
I've come up with a wait-free implementation (lock-free, and all operations complete in a bounded number of instructions).
Code time!
#include <atomic>
#include <cstdint>
template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() : m_state(0) { }
~ProducerConsumerDoubleBuffer() { }
// Never returns nullptr
T* start_writing() {
// Increment active users; once we do this, no one
// can swap the active cell on us until we're done
auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
return &m_buf[state & 1];
}
void end_writing() {
// We want to swap the active cell, but only if we were the last
// ones concurrently accessing the data (otherwise the consumer
// will do it for us when *it's* done accessing the data)
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
if ((state & 0x6) == 0) {
// The consumer wasn't in the middle of a read, we should
// swap (unless the consumer has since started a read or
// already swapped or read a value and is about to swap).
// If we swap, we also want to clear the full flag on what
// will become the active cell, otherwise the consumer could
// eventually read two values out of order (it reads a new
// value, then swaps and reads the old value while the
// producer is idle).
m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
}
}
// Returns nullptr if there appears to be no more data to read yet
T* start_reading() {
m_readState = m_state.load(std::memory_order_relaxed);
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// Nothing to read here!
return nullptr;
}
// At this point, there is guaranteed to be something to
// read, because the full flag is never turned off by the
// producer thread once it's on; the only thing that could
// happen is that the active cell changes, but that can
// only happen after the producer wrote a value into it,
// in which case there's still a value to read, just in a
// different cell.
m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;
// Now that we've incremented the user count, nobody can swap until
// we decrement it
return &m_buf[(m_readState & 1) ^ 1];
}
void end_reading() {
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// There was nothing to read; shame to repeat this
// check, but if these functions are inlined it might
// not matter. Otherwise the API could be changed.
// Or just don't call this method if start_reading()
// returns nullptr -- then you could also get rid
// of m_readState.
return;
}
// Alright, at this point the active cell cannot change on
// us, but the active cell's flag could change and the user
// count could change. We want to release our user count
// and remove the flag on the value we read.
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
// Oi, we were the last ones accessing the data when we released our cell.
// That means we should swap, but only if the producer isn't in the middle
// of producing something, and hasn't already swapped, and hasn't already
// set the flag we just reset (which would mean they swapped an even number
// of times). Note that we don't bother swapping if there's nothing to read
// in the other cell.
m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
}
}
private:
T m_buf[2];
// The bottom (lowest) bit will be the active cell (the one for writing).
// The active cell can only be switched if there's at most one concurrent
// user. The next two bits of state will be the number of concurrent users.
// The fourth bit indicates if there's a value available for reading
// in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
std::atomic<std::uint32_t> m_state;
std::uint32_t m_readState;
};
Note that the semantics are such that the consumer can never read a given value twice, and a value it does read is always newer than the last value it read. It's also fairly efficient in memory usage (two buffers, like your original solution). I avoided CAS loops because they're generally less efficient than a single atomic operation under contention.
If you decide use the above code, I suggest you write some comprehensive (threaded) unit tests for it first. And proper benchmarks. I did test it, but only just barely. Let me know if you find any bugs :-)
My unit test:
ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_writing();
if (item != nullptr) { // Always true
*item = i;
}
buf.end_writing();
}
});
std::thread consumer([&]() {
int prev = -1;
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_reading();
if (item != nullptr) {
assert(*item > prev);
prev = *item;
}
buf.end_reading();
}
});
producer.join();
consumer.join();
As for your original implementation, I only looked at it cursorily (it's much more fun to design new stuff, heh), but david.pfx's answer seems to address that part of your question.