C ++无锁模板对象池
-
23-10-2019 - |
题
它们存在吗?
*添加以澄清:
是否有实施的可用库 无锁 (这是螺纹安全性,可能是实现的Spinlock或其他轻量级同步) Objectpool ( http://en.wikipedia.org/wiki/object_pool_pattern ) 使用模板用C ++语言编写?
解决方案
我最终写了自己的对象池,其线程安全,无锁和多核可扩展,标准标准:
它可以使用4个线程在Intel Core 2 Quad 2.4 GHz Win7-X64上每秒进行1660万个借贷操作
`
#define CACHE_LINE_SIZE 64
#define alignCache __declspec(align(CACHE_LINE_SIZE))
#ifdef _WIN64
# define alignArch __declspec(align( 8))
#else
# define alignArch __declspec(align( 4))
#endif
class InterlockedFlag {
protected:
alignArch volatile unsigned int value;
public:
inline void set(unsigned int val) {
this->value = val;
}
inline unsigned int exchange(unsigned int val) {
return InterlockedExchange(&this->value,val);
}
};
#pragma pack(push,1)
template <typename T> struct ObjectPoolNode {
ObjectPoolNode<T>* next;
T data;
ObjectPoolNode() : next(nullptr) { };
};
#pragma pack(pop,1)
template <typename T> struct alignCache ObjectPoolList {
ObjectPoolList<T>* nextList;
char pad1[CACHE_LINE_SIZE - sizeof(ObjectPoolList<T>*)];
ObjectPoolNode<T>* first;
char pad2[CACHE_LINE_SIZE - sizeof(ObjectPoolNode<T>*)];
InterlockedFlag consumerLock;
char pad3[CACHE_LINE_SIZE - sizeof(InterlockedFlag)];
ObjectPoolNode<T>* last;
char pad4[CACHE_LINE_SIZE - sizeof(ObjectPoolNode<T>*)];
InterlockedFlag producerLock;
char pad5[CACHE_LINE_SIZE - sizeof(InterlockedFlag)];
ObjectPoolNode<T>** storage;
char pad6[CACHE_LINE_SIZE - sizeof(ObjectPoolNode<T>**)];
size_t available;
size_t count;
ObjectPoolList(size_t count)
: producerLock(false), consumerLock(false)
{
this->available = this->count = count;
this->storage = new ObjectPoolNode<T>*[count+1];
for(size_t i=0 ; i<count+1 ; i++) {
this->storage[i] = new ObjectPoolNode<T>;
}
for(size_t i=0 ; i<count ; i++) {
this->storage[i]->next = this->storage[i+1];
}
this->first = this->storage[0];
this->last = this->storage[count];
}
~ObjectPoolList() {
this->count = 0;
this->available = 0;
if(this->storage) {
for(size_t i=0 ; i<count+1 ; i++) {
delete this->storage[i];
}
delete[] this->storage;
this->storage = NULL;
}
}
};
template <typename T> class alignCache ObjectPool {
private:
ObjectPoolList<T>** lists;
char pad1[CACHE_LINE_SIZE - sizeof(ObjectPoolList<T>**)];
size_t available;
size_t listCount;
public:
ObjectPool(size_t count,size_t parallelCount = 0) {
this->available = count;
this->listCount = parallelCount;
if(this->listCount == 0) {
this->listCount = getSystemLogicalProcessor(); //default
}
this->lists = new ObjectPoolList<T>*[this->listCount];
for(size_t i=0 ; i<this->listCount ; i++) {
this->lists[i] = new ObjectPoolList<T>(count/this->listCount);
}
for(size_t i=0 ; i<this->listCount-1 ; i++) {
this->lists[i]->nextList = this->lists[i+1];
}
this->lists[this->listCount-1]->nextList = this->lists[0];
}
~ObjectPool() {
if(this->lists) {
for(size_t i=0 ; i<this->listCount ; i++) {
delete this->lists[i];
}
delete[] this->lists;
this->lists = NULL;
}
this->available = 0;
this->listCount = 0;
}
T* borrowObj() {
ObjectPoolList<T>* list = this->lists[0];
while( !list->available || list->consumerLock.exchange(true) ) {
if(!this->available) {
return NULL;
}
list = list->nextList;
}
if(list->first->next) {
ObjectPoolNode<T>* usedNode = list->first;
list->first = list->first->next;
list->available--;
this->available--;
list->consumerLock.set(false);
usedNode->next = nullptr;
return &usedNode->data;
}
list->consumerLock.set(false);
return NULL;
}
void returnObj(T* object) {
ObjectPoolNode<T>* node = (ObjectPoolNode<T>*)(((char*)object) - sizeof(ObjectPoolNode<T>*));
ObjectPoolList<T>* list = this->lists[0];
while( list->producerLock.exchange(true) ) {
list = list->nextList;
}
list->last->next = node;
list->last = node;
list->producerLock.set(false);
list->available++;
this->available++;
}
};
`
其他提示
最好的选择是结帐 boost.pool, ,并为其编写一个免费的锁定分配器/MUTEX接口。
鉴于有无锁的队列,我想说的是,如果不存在池,您肯定可以创建一个(几乎)无锁的池。
结合经典的Tmalloc分配器(可能会锁定但尽可能避免),我认为您将接近目标。
不隶属于 StackOverflow