سؤال

After rethinking the design and with some input from paddy I came up with something like this, but I wonder on the correctness of it, it seems fine when I run it... The idea is that preallocated objects inherit from the following:

struct Node
{
    void* pool;
};

That way we inject in every allocated object a pointer to it's pool for later releasing it. Then we have:

template<class T, int thesize>
struct MemPool
{
    T* getNext();
    void free(T* ptr);

    struct ThreadLocalMemPool
    {
        T* getNextTL();
        void freeTL();

        int size;
        vector<T*> buffer;
        vector<int> freeList;
        int freeListIdx;
        int bufferIdx;
        ThreadLocalMemPool* nextTlPool; //within a thread's context a linked list
    };

    int size;
    threadlocal ThreadLocalMemPool* tlPool; //one of these per thread
};

So basically I say MemPool<Cat, 100> and it gives me a mempool which for every thread that getNexts it, will instantiate a threadlocal mempool. Sizes I round internally to nearest power of two for easy modulo (which for simplicity ill omit). Because getNext() is local to each thread, it does not require locking, and I try to use atomics for the freeing part as follows:

T* ThreadLocalMemPool::getNextTL()
{
    int iHead = ++bufferIdx % size;
    int iTail = freeListIdx % size;

    if (iHead != iTail)  // If head reaches tail, the free list is empty.
    {
        int & idx = freeList[iHead];
        while (idx == DIRTY) {}
        return buffer[idx];
    }
    else
    {
        bufferIdx--; //we will recheck next time
        if (nextTLPool)
            return nextTLPool->getNextTL();
        else
            //set nextTLPool to a new ThreadLocalMemPool and return getNextTL() from it..
    }
}

void ThreadLocalMemPool::free(T* ptr)
{
    //the outer struct handles calling this in the right ThreadLocalMemPool

    //we compute the index in the pool from which this pool came from by subtracting from
    //its address the address of the first pointer in this guys buffer
    int idx = computeAsInComment(ptr);

    int oldListIdx = atomic_increment_returns_old_value(freeListIdx);
    freeList[oldListIdx % size] = idx;
}

Now, the idea is the freeListIdx will always trail behind the bufferIdx in a pool because you can't (I assume correct usage) free more than you have allocated. Calls to free synchronize the order in which they are returning buffer indices to the free list and the getNext will pick up on this as it cycles back. I have been thinking about it for a bit and don't see anything semantically wrong with the logic, does it seem sound or is there something subtle which could break it?

هل كانت مفيدة؟

المحلول

The thread-safe issue requires locking. If you want to relax that, you need the constraint that only one thread ever uses the pool. You can extend this to two threads if you use the circular free-list that I'll describe below, with the proviso that one thread is responsible for allocation and the other for deallocation.

As for using a vector without any other management, that is a bad idea... As soon as you start getting fragmented your allocations take a hit.

A nice way to implement this is to just allocate a large block of T. Then make a circular queue large enough to point to each of these blocks. That is your 'free-list'. You might just choose to use indices. If you limit each pool to 65536 items, you can choose unsigned short to save space (actually, it's 65535 to allow efficient circular queue management)

By using a circular queue, you allow constant-time allocation and deallocation regardless of fragmentation. You also know when your pool is full (ie the free-list is empty) and you can create another pool. Obviously, when you create a pool you need to fill the free-list.

So your class would look sort of like this:

template<class T, size_t initSize>
class MemPool
{
    vector<T> poolBuffer;              // The memory pool
    vector<unsigned short> freeList;   // Ring-buffer (indices of free items)
    unsigned short nHead, nTail;       // Ring-buffer management
    int nCount;                        // Number of elements in ring-buffer
    MemPool<T,initSize> *nextPool;     // For expanding memory pool

    // etc...
};

Now, for the locking. If you have access to atomic increment and decrement instructions and are reasonably careful, you can maintain the free-list with thread-safety. The only mutex-style locking required is when you need to allocate a new memory pool.

I've altered my original thinking. You kinda need two atomic operations and you need a reserved index value (0xffff) to spin on for non-atomic operations on the queue:

// I'll have the functions atomic_incr() and atomic_decr().  The assumption here
// is that they do the operation and return the value as it was prior to the
// increment/decrement.  I'll also assume they work correctly for both int and
// unsigned short types.
unsigned short atomic_incr( unsigned short & );
int atomic_incr( int & );
int atomic_decr( int & );

So the allocation goes something like:

T* alloc()
{
    // Check the queue size.  If it's zero (or less) we need to pass on
    // to the next pool and/or allocate a new one.
    if( nCount <= 0 ) {
        return alloc_extend();
    }

    int count = atomic_decr(nCount);
    if( count <= 0 ) {
        T *mem = alloc_extend();
        atomic_incr(nCount);     // undo
        return mem;
    }

    // We are guaranteed that there is at least 1 element in the list for us.
    // This will overflow naturally to achieve modulo by 65536.  You can only
    // deal with queue sizes that are a power of 2.  If you want 32768 values,
    // for example, you must do this: head &= 0x7fff;
    unsigned short head = atomic_incr(nHead);

    // Spin until the element is valid (use a reference)
    unsigned short & idx = freeList[head];
    while( idx == 0xffff );

    // Grab the pool item, and blitz the index from the queue
    T * mem = &poolBuffer[idx];
    idx = 0xffff;

    return mem;
};

The above uses a new private member function:

T * alloc_extend()
{
    if( nextPool == NULL ) {
        acquire_mutex_here();
        if( nextPool == NULL ) nextPool = new MemPool<T>;
        release_mutex_here();
        if( nextPool == NULL ) return NULL;
    }
    return nextPool->alloc();
}

When you want to free:

void free(T* mem)
{
    // Find the right pool to free from.
    if( mem < &poolBuffer.front() || mem > &poolBuffer.back() )
    {
        if( nextPool ) nextPool->free(mem);
        return;
    }

    // You might want to maintain a bitset that indicates whether the memory has
    // actually been allocated so you don't corrupt your pool here, but I won't
    // do that in this example...

    // Work out the index.  Hope my pointer arithmetic is correct here.
    unsigned short idx = (unsigned short)(mem - &poolBuffer.front());

    // Push index back onto the queue.  As with alloc(), you might want to
    // use a mask on the tail to achieve modulo.
    int tail = atomic_incr(nTail);
    freeList[tail] = idx;

    // Don't need to check the list size.  We assume everything is sane. =)
    atomic_incr(nCount);
}

Notice I used the value 0xffff, effectively as a NULL. The setting, clearing and spinning on this value are there to prevent a race situation. You cannot guarantee that it is safe to leave old data in the queue when multiple threads may be calling free while you have other threads calling alloc. Your queue will be cycling through, but the data in it might not be set yet.

Instead of indices, of course, you could just use pointers. But that is 4 bytes (or 8 bytes on a 64-bit application) and the memory overhead might not be worth it, depending on the data size you are pooling. Personally, I would use pointers, but for some reason it seemed easier to use indices in this answer.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top