一、项目介绍
简化版高并发内存池是基于Google 的一个开源项目TCMalloc,TCMalloc 是 Google 开发的内存分配器,全称为Thread-Caching-Malloc,即线程缓存的malloc,实现高效的多线程内存管理。
1.1、项目知识要求
这个项目涉及C/C++、数据结构(链表、哈希桶)、操作系统的内存管理、单例模式、多线程、互斥锁等。
二、内存池
2.1、池化技术
所谓的**“池化技术”**就是向系统申请一个过量的资源,然后自己管理,类似的现象就是比如向家里人要生活费,索要生活费可大致分为两种,一种就是每天需要多少生活费,就问家里人要多少生活费,另外一种就是,一次要一周的生活费自己管理。池化技术就类似后者,可以避免每次都问系统申请资源,提高程序运行效率。
2.2、内存池
内存池百度百科
内存池就是指程序先从操作系统中申请一块足够大的内存,此后,当程序需要申请内存时,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存时,并不是直接将内存返回给操作系统,而是返回内存池。当程序退出时,内存池将之前申请的内存释放。
2.3、内存池主要解决什么问题
1、效率问题
针对效率问题,如前面说的生活费问题,可以避免多次向操作系统申内存。
2、内存碎片的问题
a、内存碎片是怎么产生的?
如上图所示,vector向系统申请了256Byte的空间,然后又释放给系统,list向系统申请128Byte的空间,然后也释放了,我们现在又三百多Byte的空间,现在想申请三百多Byte的空间,但是会申请失败,因为系统中的那三百多Byte的空间是不连续的,所以哪些空间就成为了内存碎片。
三、定长内存池
定长内存池就是空间大小固定的内存。
定长内存池之所以高效,是因为它可以切除固定大小的内存供线程使用。
3.1如何设计一个定长内存池?
3.1.1定长内存池中有什么?
定长内存池包括:一个大块内存(内存池),一个用于链接释放空间的自由链表,用于计算大块内存在切分后剩余空间大小。
private:
//指向大块内存的指针
char* _memory=nullptr;//为什么要用char?因为一个char就是一个在字节
void* _freeList = nullptr;//还回过程中链接的自由链表的指针
size_t _remainBytes = 0;//大块内存剩余的字节数
3.1.2创建内存池
首先直接向系统申请一大块内存,用来做内存池
申请完后就可以使用该内存池,当我们释放内存时,该内存就会被挂在自由链表中(回收内存链表),所以,我们应该首先判断自由链表中有没有已经回收了的内存,如果有,我们优先使用该链表中挂的内存,否则直接在内存池中申请内存。
T* New()
{
T* obj;
//优先使用_freelist中的空间
if (_freeList)
{
void* next = *(void**)_freeList;//存储地址,下一个内存块的地址——》指向下一个内存块
obj =(T*) _freeList;
_freeList = next;
}
else
{
//剩余内存不够一个人对象大小时,重新开一个大空间
if (_remainBytes <sizeof(T))
{
_remainBytes = 128 * 1024;
//_memory = (char*)malloc(_remainBytes);
//直接调用系统
_memory = (char*)SystemAlloc(_remainBytes >> 13);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
/*T* obj = (T*)_memory;
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
_memory += objSize;
_remainBytes -= sizeof(T);*/
}
obj = (T*)_memory;
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
_memory += objSize;
_remainBytes -= objSize;
}
//因为这里只是开了空间,并没有初始化,所以需要调用函数的我初始化
//定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
3.1.3释放内存
当内存不用之后,直接用链表将释放的内存块挂起来
我们这里将第一块内存中的一部分用来存放下一内存的地址,这样就可以像链表一样将内存块挂起来
//释放内存
void Delete(T* obj)
{
//显示调用析构函数
obj->~T();
//if (_freeList == nullptr)
//{
// _freeList = obj;
// //*(int*) 对int*解引用就会指向四个字节
// //*(void**)在32或者64位下分别是4字节和8字节,刚好可以存一个地址
// *(void**)obj = nullptr;//首先先让这个返回的自由链表的前四个字节指向nullptr
//}
//else
//{
// *(void**)obj == _freeList;
// _freeList = obj;
//}
//直接头插就可以,不需要去判断是否为空
*(void**)obj = _freeList;
_freeList = obj;
}
3.2整体代码
#pragma once
#include"Common.h"
//定长内存池
template<class T>
class ObjectPool
{
public:
//申请内存
T* New()
{
T* obj;
//优先使用_freelist中的空间
if (_freeList)
{
void* next = *(void**)_freeList;//存储地址,下一个内存块的地址——》指向下一个内存块
obj =(T*) _freeList;
_freeList = next;
}
else
{
//剩余内存不够一个人对象大小时,重新开一个大空间
if (_remainBytes <sizeof(T))
{
_remainBytes = 128 * 1024;
//_memory = (char*)malloc(_remainBytes);
//直接调用系统
_memory = (char*)SystemAlloc(_remainBytes >> 13);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
/*T* obj = (T*)_memory;
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
_memory += objSize;
_remainBytes -= sizeof(T);*/
}
obj = (T*)_memory;
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
_memory += objSize;
_remainBytes -= objSize;
}
//因为这里只是开了空间,并没有初始化,所以需要调用函数的我初始化
//定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
//释放内存
void Delete(T* obj)
{
//显示调用析构函数
obj->~T();
//if (_freeList == nullptr)
//{
// _freeList = obj;
// //*(int*) 对int*解引用就会指向四个字节
// //*(void**)在32或者64位下分别是4字节和8字节,刚好可以存一个地址
// *(void**)obj = nullptr;//首先先让这个返回的自由链表的前四个字节指向nullptr
//}
//else
//{
// *(void**)obj == _freeList;
// _freeList = obj;
//}
//直接头插就可以,不需要去判断是否为空
*(void**)obj = _freeList;
_freeList = obj;
}
private:
//指向大块内存的指针
char* _memory=nullptr;//为什么要用char?因为一个char就是一个在字节
void* _freeList = nullptr;//还回过程中链接的自由链表的指针
size_t _remainBytes = 0;//大块内存剩余的字节数
};
struct TreeNode
{
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode()
:_val(0)
, _left(nullptr)
, _right(nullptr)
{}
};
//测试用例
void TestObjectPool()
{
//申请释放的轮次
const size_t Rounds = 5;
//每轮申请释放多少次
const size_t N = 100000;
std::vector<TreeNode*> v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j0 = 0; j0 < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
}
size_t end1 = clock();
std::vector<TreeNode*> v2;
v2.reserve(N);
ObjectPool<TreeNode> TNPool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(TNPool.New());
}
for (int i = 0; i < N; ++i)
{
TNPool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
cout << "new cost toime:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
}
3.2.1测试结果
四、高并发内存池整体结构框架设计
在实现内存池时,我们一般需要考虑到效率问题和内存碎片问题,但是对于高并发内存池来说,我们不仅需要考虑以上问题,还需要考虑在多线程环境下的锁竞争问题
高并发内存池整体框架由以下几个部分组成:thread cache(线程缓存)、central cache(中心缓存)、page cache(页缓存)。
4.0各个部分的主要作用
- thread cache主要解决的是锁竞争的问题,每一个线程独享自己的thread cache,当thread cache中有内存时,该线程就不会和其他线程进行竞争,每个线程只需要在自己的thread cache中申请内存就好。
- central cache主要起到一个居中调度的作用,每一个线程的thread cache需要从central cache中获取内存,如果thread cache中的内存有很多的时候,就会将内存还给central cache,其作用类似于中枢,起到调节的作用,所以被称为中心缓存。
- page cache就负责提供以页为单位的大块内存,当central cache需要内存时,就会向page cache申请,而当page cache中没有足够多的内存时,就会直接向系统进行申请。
4.1thread cache(线程缓存)
a、线程缓存就是每个线程独有一个线程缓存空间,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,这样不经可以保证线程与线程之间的独立,并且可以保证并发高效。
b、定长内存池只支持固定大小的内存块的申请,因此定长内存池只需要一个自由链表用来管理释放回来的内存块(固定大小的内存块)。如果我们想要申请和释放不同大小的内存块,那么就需要多个自由链表来管理释放回来的内存块,因此thread cache实际上是一个哈希桶的结构,每个桶上挂着不同大小的自由链表。
c、我们需要设计所有大小的内存块吗?不是的,如果我们要设计所有大小的内存块,我们就需要20多万个自由链表,这样无疑是一个很大的工程,并且只是用来存储这些自由链表的头指针就要消耗大量的内存。所有我们采用按照某种规则对这些字节数进行对齐,这里我们采用的是8字节对齐规则(因为在64位下,8字节刚好可以存储头指针),比如我们申请1~8字节大小的内存时,thread cache 直接就给我们8字节,如果申请9~16字节时,就直接给我们16字节大小的内存块。
但是当在多线程情况下,thread cache 可能会同时去central cache 申请内存,此时就会涉及线程安全的问题,因此在访问central cache时需要加锁,但是central cache实际上是一个哈希桶的结构,只有当多个线程同时访问一个桶的时候才会加锁,所以这里的锁竞争问题不是很激烈。
4.1.1thread cache 包含的内容
a、插入:push---------》头插
void push(void* obj)
{
//头插
//*(void**)obj = _freeList;//先将obj强转成void**,这样就会取到前四或者八个个字节(地址)
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
b、删除(弹出):pop--------》头删
void* pop()
{
//头删
assert(_freeList);
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
}
threadcache哈希桶映射对齐规则
虽然对齐产生的内碎片会引起一定程度的空间浪费,但按照上面的对齐规则,我们可以将浪费率控制到百分之十左右。需要说明的是,1~128这个区间我们不做讨论,因为1字节就算是对齐到2字节也有百分之五十的浪费率。
对齐和映射相关函数的编写:
我们需要提供两个对应的函数,分别用于获取某个大小字节数对齐后的字节数,以及该字节数对用的哈希桶的下标(当释放该内存时,方便将该内存块归还)
//管理对齐和映射等关系
class SizeClass
{
public:
//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes);
//获取对应哈希桶的下标
static inline size_t Index(size_t bytes);
};
获取对齐后的字节数:我们需要先判断该申请的内存块位于哪一个区间,然后再通过调用子函数进行进一步的处理。
//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes)
{
if (bytes <= 128)
{
return _RoundUp(bytes, 8);
}
else if (bytes <= 1024)
{
return _RoundUp(bytes, 16);
}
else if (bytes <= 8 * 1024)
{
return _RoundUp(bytes, 128);
}
else if (bytes <= 64 * 1024)
{
return _RoundUp(bytes, 1024);
}
else if (bytes <= 256 * 1024)
{
return _RoundUp(bytes, 8 * 1024);
}
else
{
assert(false);
return -1;
}
}
//一般写法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
size_t alignSize = 0;
if (bytes%alignNum != 0)
{
alignSize = (bytes / alignNum + 1)*alignNum;
}
else
{
alignSize = bytes;
}
return alignSize;
}
//大佬关于字节对齐的写法
//位运算写法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
return ((bytes + alignNum - 1)&~(alignNum - 1));
}
这里对按照位运算写法进行讲解
假设我们想要10字节大小的内存块,对齐后就是16字节,这是怎么进行计算的呢?
1、alignNum - 1:(8-1=7:00111)
2、bytes + alignNum - 1:(10+7=10001)
3、~(alignNum - 1):(00111按位取反11000)
4、((bytes + alignNum - 1)&~(alignNum - 1)):(10001&11000=10000)
即,得到对齐后的字节数是16
当我们知道对齐后的字节数后,我们还要确定该字节对应的哈希桶下标,我们也是先判断盖子结束属于哪一个区间,然后在通过调用子函数确定哈希桶下标。
//获取对应哈希桶的下标
static inline size_t Index(size_t bytes)
{
//每个区间有多少个自由链表
static size_t groupArray[4] = { 16, 56, 56, 56 };
if (bytes <= 128)
{
return _Index(bytes, 3);
}
else if (bytes <= 1024)
{
return _Index(bytes - 128, 4) + groupArray[0];
}
else if (bytes <= 8 * 1024)
{
return _Index(bytes - 1024, 7) + groupArray[0] + groupArray[1];
}
else if (bytes <= 64 * 1024)
{
return _Index(bytes - 8 * 1024, 10) + groupArray[0] + groupArray[1] + groupArray[2];
}
else if (bytes <= 256 * 1024)
{
return _Index(bytes - 64 * 1024, 13) + groupArray[0] + groupArray[1] + groupArray[2] + groupArray[3];
}
else
{
assert(false);
return -1;
}
}
//一般写法
static inline size_t _Index(size_t bytes, size_t alignNum)
{
size_t index = 0;
if (bytes%alignNum != 0)
{
index = bytes / alignNum;
}
else
{
index = bytes / alignNum - 1;
}
return index;
}
//大佬写法
//位运算写法
//注意:这里传入的不是对齐后的字节数,而是传入对齐数规则中按照几位对齐的数(这里是8),将8写成2的n次方中的n=3值进行传入.
static inline size_t _Index(size_t bytes, size_t alignShift)
{
return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1;
}
我们这里还是按照10字节进行计算
最后结果是1,是因为我们的哈希桶下标是从0开始的。
申请对象
//小于等于MAX_BYTES,就找thread cache申请
//大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
//thread cache和central cache自由链表哈希桶的表大小
static const size_t NFREELISTS = 208;
class ThreadCache
{
public:
//申请内存对象
void* Allocate(size_t size);
private:
FreeList _freeLists[NFREELISTS]; //哈希桶
};
//申请内存对象
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
if (!_freeLists[index].Empty())
{
return _freeLists[index].Pop();
}
else
{
return FetchFromCentralCache(index, alignSize);
}
}
thread cacheTLS无锁访问
我们这里知道,每个线程都独享一个自己的thread cache,那么如何来创建这个thread cache呢?我们不能将thread cache设计为全局的,因为如果设计成全局的话,每一个线程就都可以访问该thread cache,如此一来,我们就需要用锁进行控制,但是这和我们当初设计thread cache 的初衷就矛盾了。要实现每个线程只能访问自己的thread cache时,我们就需要用到线程局部存储TLS(Thread Local Storage),使用该存储方法的变量在他所在的线程内是全局可以访问的,但是不能被其他线程访问,这样就保持了数据的线程独立性。
//TLS - Thread Local Storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
4.2 central cache(中心缓存)
中心缓存是所有线程共享的一块空间,thread cache是按需从central cache中获取对象。当该对象结束释放空间,central cache回收thread cache中的对象。central cache是存在竞争的,所以这里存取对象是需要加锁的。 这里使用的是哈希桶,当多个线程访问同一个桶的时候才会加锁。
虽然central cache的结构也是哈希桶结构与thread cache结构上不同的地方在于,central cache的每个桶中挂的是一个一个的span,而thread cache上挂的是一个一个内存块。
每一个span管理的都是以页为单位的大块内存,每个桶中的若干个span是用双链表的形式链接起来的,并且每一个span下挂着固定大小内存块的自由链表。
central cache的结构设计
a、判断页号的取值范围
每个程序在不同位数的机器上运行起来的时候,他的进程地址空间是不一样的,在32位下是2^32,在64位机器上是 2的64次方。所以在不同的机器上,页号的范围是不一样的。
所以我们先要判断该机器是多少位
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
//linux
#endif
注意:因为在64位下_WIN64和_WIN32都有定义,在32位下只有_WIN32有定义,所以,我们需要判断_WIN64是否有定义,再判断_WIN32是否有定义。
b、span的结构
在central cache中每个桶挂的都是一个一个span,span是一个管理以页为单位的大块内存(我们将span实现成双链表结构)。
//管理以页为单位的大块内存
struct Span
{
PAGE_ID _pageId = 0; //大块内存起始页的页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双链表结构
Span* _prev = nullptr;
size_t _useCount = 0; //切好的小块内存,被分配给thread cache的计数
void* _freeList = nullptr; //切好的小块内存的自由链表
};
_pageId:对于span管理的以页为单位的大块内存,我们需要知道这块内存具体是在那个位置,方便之后归还后进行page cache进行前后页的合并,因此span结构中会记录所管理大块内存起始页的页号。
_n :用于记录该span管理页的个数。
_useCount:用于记录用了多少个内存块,当值为0时,代表当前的span切分出去的内存块对象已经全部被还回来了,此时,就可以将该span归还给page cache。
_freeList :每个span都有一个自由链表,用来连接大小固定的内存块。
_next、_prev:双链表结构,用来将某一span直接从链表中移除,归还给page cache。如果采用单链表结构时,当要移除时,我们需要知道该span的前一个节点。
c、带头双向循环链表
//带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head); //不能删除哨兵位的头结点
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
d、central cache结构
central cache的映射规则和thread cache是一样的,因此central cache里面哈希桶的个数也是208,但central cache每个哈希桶中存储就是我们上面定义的双链表结构。
class CentralCache
{
public:
//...
private:
SpanList _spanLists[NFREELISTS];
};
central cache的核心
a、central cache的实现方式
在整个进程中central cache只有一个,对于这种只能创建一个对象的类,我们可以将其设置成单例模式。
对于单例模式,我们都知道单例模式可以分为饿汉模式和懒汉模式
我们只需要提供一个可以访问全局的访问点即可,所以我们使用饿汉模式就行,因为懒汉模式比较复杂。
#pragma once
#include"Common.h"
//单例模式
class CentralCache
{
public:
static CentralCache* GenInstance()
{
return &_sInst;
}
Span* GetOneSpan(SpanList& list, size_t byte_size);
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
//将一定数量的对象释放到span跨度
void ReleaseListToSpans(void* start, size_t byte_size);
private:
SpanList _spanList[NFREELIST];//保证该线程在thread cache中映射的位置与在central cache中映射的位置相同
private:
CentralCache()
{}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};
为了保证central cache类只创建一个对象,我们就必须将其的构造函数二号拷贝构造函数设置为私有,或者按照C++11的规则在函数声明后加上“=delete”进行修饰。此外我们还需要一个central cache类型的静态的成员变量,当程序运行起来后我们就立马创建对象,在此之后就只有这一个单例了。
CentralCache CentralCache::_sInst;
b、慢开始反馈调节算法
当thread cache向central cache申请空间的时候,这个时候我们应该给多少空间呢?给的太多会造成浪费,给的太少效率会变得低下,所以我们采用一种慢开始的反馈算法用来控制这个大小。
//管理对齐和映射等关系
class SizeClass
{
public:
//thread cache一次从central cache获取对象的上限
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
//对象越小,计算出的上限越高
//对象越大,计算出的上限越低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
};
针对上面的代码,当我们申请小对象时,他会一次性给我们512个对象大小,但是我们不需要者们多,所i基于这个原因,我们在FreeList结构中增加一个叫做_maxSize的成员变量,该变量的初始值设置为1,并且提供一个公有成员函数用于获取这个变量。也就是说,现在thread cache中的每个自由链表都会有一个自己的_maxSize。此时当thread cache申请对象时,我们会比较 _maxSize和计算得出的值,取出其中的较小值作为本次申请对象的个数。此外,如果本次采用的是_maxSize的值,那么还会将thread cache中该自由链表的_maxSize的值进行加一(或者加其他数或着乘以2都可以)。因此,thread cache第一次向central cache申请某大小的对象时,申请到的都是一个,但下一次thread cache再向central cache申请同样大小的对象时,因为该自由链表中的_maxSize增加了,最终就会申请到两个。直到该自由链表中_maxSize的值,增长到超过计算出的值后就不会继续增长了,此后申请到的对象个数就是计算出的个数。(这有点像网络中拥塞控制的机制)
//管理切分好的小对象的自由链表
class FreeList
{
public:
size_t& MaxSize()
{
return _maxSize;
}
private:
void* _freeList = nullptr; //自由链表
size_t _maxSize = 1;
};
c、从中心缓存获取对象
当我们向central cache申请对象时,我们要先通过慢开始反馈算法计算申请对象的个数,然后再向centra cache进行申请。会出现以下两种情况:1、申请一个,2、申请多了。
当我们申请多了之后除了第一个对象返回之外,还需要将剩下的对象挂在thread cache对应的哈希桶中。
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//慢开始反馈调节算法
//1、最开始不会以西向central cache一次批量要太多,因为要太多了可能用不完
//2、如果一直要size大小的内存需求,那么batchNum就会不断增长,直到上限
//3、size越大,一次向central cache要的batchNum就越小
//4、size越小,一次向central cache要的batchNum就越大
size_t batchNum = min(_freeList[index].MaxSize(),SizeClass::NumMoveSize(size));
void* start = nullptr;
void* end = nullptr;
if (_freeList[index].MaxSize() = batchNum)
{
_freeList[index].MaxSize() += 1;
}
size_t actualNum=CentralCache::GenInstance()->FetchRangeObj(start,end,batchNum,size);
assert(actualNum>0);
if (actualNum == 1)
{
assert(start == end);
return start;
}
else
{
_freeList[index].PushRange(NextObj(start), end, actualNum-1);
return start;
}
}
d、从central cache获取一定数量的对象
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
_spanList[index]._mtx.lock();
Span* span = GetOneSpan(_spanList[index],size);
assert(span);
assert(span->_freeList);
//从span中获取batchNum个对象
//如果不够batchNum,有多少拿多少
start = span->_freeList;
end = start;
size_t i = 0;
size_t actualNum = 1;
while (i < batchNum - 1 && NextObj(end) != nullptr)
{
end = NextObj(end);
++i;
++actualNum;
}
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
span->_useCount += actualNum;
_spanList[index]._mtx.unlock();
return actualNum;
}
e、插入一段范围的对象到自由链表中
当我们获取的对象数大于一时,我们需要将剩下的对象插入thread cache中对应的哈希桶中。
//管理切分好的小对象的自由链表
class FreeList
{
public:
//插入一段范围的对象到自由链表
void PushRange(void* start, void* end)
{
assert(start);
assert(end);
//头插
NextObj(end) = _freeList;
_freeList = start;
}
private:
void* _freeList = nullptr; //自由链表
size_t _maxSize = 1;
};
4.3page cache(页缓存)
页缓存是以页为单位i进行存储及分配的,当central cache没有内存对象时,从page cache分配一定数量的page,分配给central cache。当对象回收后,page chche会回收符合条件的central cache的对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
page cache与central cache相同之处:都是哈希桶结构,并且每个哈希桶中都挂着一个一个span,这些span也是按照双链表的结构链接起来的。
page cache与central cache不同之处:page cache的映射规则不是对齐规则,而是采用的是直接定址法,比如1号桶挂的都是1页的span……。page cache不会将span切分成更小的内存块给central cache,而是直接将该span分配给central cache。
- 假设central cache需要5页的内存,就去page cache中第5号桶里拿数据,5号桶也是一个自由链表,链接这spanlist,每一个span都是5页内存,大小。需要去遍历这个spanlist,看是否有空间。如果没有,进入步骤2。
- 则Pagc cachc去6号桶去给ccntral cachc拿内存,如果拿到了6页的内存,就把6页的内存,切分成1页和5页的内存,将5页的内存返给ccntral cachc,1页的内存挂到1号桶的自由链表中。如果6号桶没有内存就向7号桶拿内存7~128号桶都没有内存,就需要向系统中请内存.
- 一次向系统申请128页的内存,挂到128号桶中。
- page cache再从5号桶开始遍历,一直遍历到128号桶,此时128号桶里有内存,将内存切分成5页和123页,5页分给central cache,123页挂到123号桶中。
page cache的实现
当central cache向page cache的多个桶申请的时候,会存在线程安全的问题,因此在访问page cache时我们需要加锁,但是我们不能申请用桶锁,因为当central cache向page cache申请内存时,page cache可能会将其他桶当中大页的span切小后再给central cache。此外,当central cache将某个span归还给page cache时,page cache也会尝试将该span与其他桶当中的span进行合并。也就是说可能存在同时访问多个桶的情况,如果采用桶锁,就会存在大量的加锁解锁,会导致效率低下。所以我们使用一个大锁,直接将page cache给锁起来。
此外,page cache在整个进程中也是只能存在一个,所以我们将其设置成单例模式。
#pragma once
#include"Common.h"
#include"ObjectPool.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
//获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
//释放空闲span回到pagecache,合并相邻的span
void ReleaseSpanToPageCache(Span* span);
//获取一个k页的span
Span* NewSpan(size_t k);
std::mutex _pageMtx;
private:
SpanList _spanLists[NPAGES];
ObjectPool<Span> _spanPool;
std::unordered_map<PAGE_ID, Span*>_idSpanMap;
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
PageCache PageCache::_sInst;
从page cache获取span
thread cache向central cache申请对象时,central cache需要先从对应的哈希桶中获取到一个非空的span,然后从这个非空的span中取出若干对象返回给thread cache。那central cache到底是如何从对应的哈希桶中,获取到一个非空的span的呢?
首先当然是先遍历central cache对应哈希桶当中的双链表,如果该双链表中有非空的span,那么直接将该span进行返回即可。为了方便遍历这个双链表,我们可以模拟迭代器的方式,给SpanList类提供Begin和End成员函数,分别用于获取双链表中的第一个span和最后一个span的下一个位置,也就是头结点。
//带头双向循环链表
class SpanList
{
public:
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
如果当span为空或者没有时,我们就需要向page cache申请内存块。我们需要根据具体对象计算出要申请几页的内存块,不够一页的按照一页。
//管理对齐和映射等关系
class SizeClass
{
public:
//central cache一次向page cache获取多少页
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size); //计算出thread cache一次向central cache申请对象的个数上限
size_t nPage = num*size; //num个size大小的对象所需的字节数
nPage >>= PAGE_SHIFT; //将字节数转换为页数
if (nPage == 0) //至少给一页
nPage = 1;
return nPage;
}
};
其中,PAGE_SHIFT代表的是将字节数转换为页数。
//页大小转换偏移,即一页定义为2^13,也就是8KB
static const size_t PAGE_SHIFT = 13;
注意:当我们申请到若干页的span时,我们需要将该span切分成对应大小的对象挂在对应的自由链表中,我们需要先计算出span的起始地址,用span的起始页乘以一页的大小就可以的到span的起始地址,然后再用页数乘以一页大小,我们就可以得到该span所管理的内存块的大小了,用起始地址加上内存块大小,我们就可以得到这个span的结束位置,这样我们就可以将该span切分成对应大小的对象了,然后尾插到自由链表中。
尾插是为了让这些对象看起来是按照链式结构链接起来的,但是在物理上是连续的。这样可以提高该线程的CPU缓存利用率。
//获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
//查看当前的spanlist中是否还有未分配对象的span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}
//先把centrai cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
list._mtx.unlock();
//走到这里说明没有空闲span了,只能找page cache要
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_isUse = true;
span->_objSize = size;
PageCache::GetInstance()->_pageMtx.unlock();
//对获取的span进行切分,不需要加锁,因为这个时候其他线程访问不到这个span
//计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n<<PAGE_SHIFT;
char* end = start + bytes;
//把大块内存切成自由链表链接起来
//1、先切一块下来做头,方便尾插
span->_freeList = start;
start += size;
void* tail = span->_freeList;
while (start < end)
{
NextObj(tail) = start;
tail = start;//tail=NextObj(tail);
start += size;
}
NextObj(tail) = nullptr;
//切好span以后,需要把span挂到桶里面的时候,再加锁
list._mtx.unlock();
list.PushFront(span);
return span;
}
我们需要实现一个接口用于将一个span插入到central cache对应的哈希桶中的双链表中,我们这里选择头插,这样当central cache下一次从该双链表中获取非空的span时,就可以直接找到。
//带头双向循环链表
class SpanList
{
public:
void PushFront(Span* span)
{
Insert(Begin(), span);
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
获取一个K页的span
因为page cache是直接按照页数进行映射的,因此我们要从page cache获取一个k页的span,就应该直接先去找page cache的第k号桶,如果第k号桶中有span,那我们直接头删一个span返回给central cache就行了。所以我们这里需要再给SpanList类添加对应的Empty和PopFront函数。
//带头双向循环链表
class SpanList
{
public:
bool Empty()
{
return _head == _head->_next;
}
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
- 假设central cache需要5页的内存,就去page cache中第5号桶里拿数据,5号桶也是一个自由链表,链接这spanlist,每一个span都是5页内存,大小。需要去遍历这个spanlist,看是否有空间。如果没有,进入步骤2。
- 则Pagc cachc去6号桶去给ccntral cachc拿内存,如果拿到了6页的内存,就把6页的内存,切分成1页和5页的内存,将5页的内存返给ccntral cachc,1页的内存挂到1号桶的自由链表中。如果6号桶没有内存就向7号桶拿内存7~128号桶都没有内存,就需要向系统中请内存.
- 一次向系统申请128页的内存,挂到128号桶中。
- page cache再从5号桶开始遍历,一直遍历到128号桶,此时128号桶里有内存,将内存切分成5页和123页,5页分给central cache,123页挂到123号桶中。
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k <= NPAGES);
//大于128 page的直接向堆申请
if (k > NPAGES - 1)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
_idSpanMap[span->_pageId] = span;
return span;
}
//先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
Span* kSpan= _spanLists[k].PopFront();
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
//检查后面的桶里面有没有span,如果有可以把大的进行切分
for (size_t i = k + 1; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
//Span* kSpan = new Span;
Span* kSpan = _spanPool.New();
//在nSpan的头部切一个k页下来
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
//将剩余页数挂到对应的桶号上
_spanLists[nSpan->_n].PushFront(nSpan);
//存储nspan的首尾页号跟nspan映射,方便pagecache回收内存时进行合并查找
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
//返回kSpan
return kSpan;
}
}
//走到这里说明没有大页的span
//这是就要去堆要一个128页的span
//Span* bigSpan = new Span;
Span* bigSpan = _spanPool.New();
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
//递归调用
return NewSpan(k);
}
4.4thread cache回收内存
当某个线程申请的对象不用了的时候,就可以将其释放给thread cache,然后thread cachea就将该对象插入到对应的哈希桶中的自由链表中即可。
但是随着线程的不断释放,所对应的自由链表的长度也会相应的越来越长,就到导致这些内存块堆积在某一thread cache中,就会造成浪费,所以我们需要将该内存还给central cache,这样当其他的thread cache想申请内存时,就可以继续向central cache申请。
这里我们设置成如果thread cache某个桶中的自由链表的长度超过它一次批量申请的对象的个数,那么此时我们就要把该自由链表当中的内存还给central cache。
void* ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
//找对映射的自由链表桶,插入进去
size_t index = SizeClass::Index(size);
_freeList[index].push(ptr);
//当链表长度大于一次批量申请的内存时就开始还一段list给central cache
if (_freeList[index].Size() >= _freeList[index].MaxSize())
{
ListTooLong(_freeList[index], size);
}
}
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
list.PopRange(start, end, list.MaxSize());
CentralCache::GenInstance()->ReleaseListToSpans(start, size);
}
从上述代码我们可以看出,我们需要PopRange函数用来从自由链表中取出指定的一段对象,而且我们还需要增加一个可以用来记录当前自由链表中对象个数的值,当我们向自由链表插入或者删除对象的时候,我们都要更新这个值。
//管理切分好的小对象的自由链表
class FreeList
{
public:
void push(void* obj)
{
//头插
//*(void**)obj = _freeList;//先将obj强转成void**,这样就会取到前四或者八个个字节(地址)
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
void PushRange(void* start, void* end,size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
void PopRange(void*& start, void*& end, size_t n)
{
assert(n >= _size);
start = _freeList;
for (size_t i = 0; i < n - 1; ++i)
{
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
void* pop()
{
//头删
assert(_freeList);
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
}
bool Empty()
{
return _freeList == nullptr;
}
size_t& MaxSize()
{
return _maxSize;
}
size_t Size()
{
return _size;
}
private:
void* _freeList=nullptr;
size_t _maxSize = 1;
size_t _size=0;
};
4.5central cache回收内存
当thread cache中的某个自由链表太长时,就会将这个自由链表中的对象还给central cache中的span。
但是在这个自由链表中,不是所有的内存块都是来自同一span的,所以我们需要知道这些对象到底是属于哪一个span的。
a、如何根据对象得到对应的页号
实际上一页是有很多地址的,在这个页内的所有地址除以一页的大小就会得到页号。
b、如何找到对应的span呢?
虽然上面我们可以找到对应的页,但是我们如何找到对应的span呢?因为一页上可管理多个span。所以我们就需要之前学到的知识——建立页号和span之间的映射。这个结构我们在page cache中进行span合并时页需要使用,因此就直接在page cache中添加即可。
//单例模式
class PageCache
{
public:
//获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
private:
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};
此时我们就既可以找到对应的页号,而且在unordered_map当中可以找到对应的span
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
std::unique_lock<std::mutex> lock(_pageMtx);
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}
}
c、central cache回收内存
当我们将thread cache中的对象还给central cache时,就可以依次去遍历这些对象,将这些对象插入对应的span中的自由链表中去,并且及时的更新span中的_useCount技术即可。当central cache中某个span中的_useCount为0时,就意味着这个span中所有的内存都已经返还回来了,就可以进一步将这个span还给page cache。
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
_spanList[index]._mtx.lock();
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--;
//说明span的切分出去的所有小块内存都回来了
//这个span就可以在回收给page cache ,pagecache可以再去尝试去做前后页的合并
if (span->_useCount == 0)
{
_spanList[index].Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
//释放span给pagecache时,使用page cache的锁就可以了
_spanList[index]._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
_spanList[index]._mtx.lock();
}
start = next;
}
_spanList[index]._mtx.unlock();
}
4.6page cache回收内存
当central cache中的某个span的_useCount减到0了,那么就需要将该span还给page cache。但是有可能最后会造成在好多page cache中的span中挂了好多还回来(没用)的内存页,当要申请一个大块的内存页的时候,按道理是可以申请的但是因为在不同的span中就不能申请,就会造成内存碎片问题,所以我们就要将该处(span)附近的前后页进行合并,合并成大的页。
合并过程可以分为向前合并和向后合并
a、向前合并
- 假设我们还回来的内存块的页号为PageNum,该把内存块管理的页数为n,我们向前合并的时候就先判断PageNum-1页对应的span是否存在。
- 再进行判断是否没有被使用。
- 判断和前面加起来的页数是否会超过最大值(有可能是两个最大值的页数进行合并)
- 当这些条件都满足后就向前合并,重复上述步骤直到不能进行合并了。
- 当进行到这里的时候,更新起始页号,更新页数,释放被合并的span。
b、向后合并
和向前合并的步骤几乎是一样的,只是合并的方向不
我们需要一个新的标志来记录是否该span在被使用,如果使用central cache中的_useCount是否为0是不够准确的,因为有可能是central cache刚申请的,所以我们需要另外的标志进行标识。
//管理以页为单位的大块内存
struct Span
{
PAGE_ID _pageId = 0; //大块内存起始页的页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双链表结构
Span* _prev = nullptr;
size_t _useCount = 0; //切好的小块内存,被分配给thread cache的计数
void* _freeList = nullptr; //切好的小块内存的自由链表
bool _isUse = false; //是否在被使用
};
我们还需要建立page cache中span和页号的映射关系,但是和central cache不同的是,page cache需要建立一个span的首尾页和span的映射关系,因为在进行前后合并的时候,我们只需要知道该span的首页地址或者尾部地址。还有当我们申请k页的span时,如果是将n页的span切成了一个k页的span和一个n-k页的span,我们除了需要建立k页span中每个页与该span之间的映射关系之外,还需要建立剩下的n-k页的span与其首尾页之间的映射关系。
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
//先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
//检查一下后面的桶里面有没有span,如果有可以将其进行切分
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
//在nSpan的头部切k页下来
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
//将剩下的挂到对应映射的位置
_spanLists[nSpan->_n].PushFront(nSpan);
//存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
//尽量避免代码重复,递归调用自己
return NewSpan(k);
}
前后合并代码:
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//大于128 page的直接还给堆
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span);
return;
}
//对span前后的页,尝试进行合并,缓解内存碎片问题
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
auto ret = _idSpanMap.find(prevId);
//前面的页号没有,不合并了
if (ret == _idSpanMap.end())
{
break;
}
//前面相邻的span在使用,不合并
Span* prevSpan = ret->second;
if (prevSpan->_isUse == true)
{
break;
}
//合并出超过128页的span没办法管理,不合并
if (prevSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
//delete prevSpan;
_spanPool.Delete(prevSpan);
}
//向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n - 1;
auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end())
{
break;
}
Span* nextSpan = ret->second;
if (nextSpan->_isUse == true)
{
break;
}
if (span->_n + nextSpan->_n > 128)
{
break;
}
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
//delete nextSpan;
_spanPool.Delete(nextSpan);
}
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
}
合并结束后,我们除了将合并后的span挂到page cache对应的哈希桶的双链表中外,还要建立该span与其首尾页之间的映射关系,而且还要将该span的状态设置为没有被使用,方便此后继续合并。
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
五、大于256KB的大块内存申请问题
a、申请过程
最开始我们设计为,每个线程的thread cache是用来申请小于等于256KB的内存的,对于大于256KB的内存我们就需要向page cache申请,但是page cache中最大的页只有128页,因此如果申请大于128页的内存我们只能向堆申请。
但是在申请的内存大于256KB时,虽然不是从thread cache申请的,但是在分配内存时也是需要进行向上对齐的,对于大于256KB的内存我们可以直接按照页进行对齐。
//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes)
{
if (bytes <= 128)
{
return _RoundUp(bytes, 8);
}
else if (bytes <= 1024)
{
return _RoundUp(bytes, 16);
}
else if (bytes <= 8 * 1024)
{
return _RoundUp(bytes, 128);
}
else if (bytes <= 64 * 1024)
{
return _RoundUp(bytes, 1024);
}
else if (bytes <= 256 * 1024)
{
return _RoundUp(bytes, 8 * 1024);
}
else
{
//大于256KB的按页对齐
return _RoundUp(bytes, 1 << PAGE_SHIFT);
}
}
所以当我们申请大于256KB的内存时,就不用向thread cache申请,这时,我们需要将该内存转换为页数,按页进行对齐,然后再调用申请指定页数的span即可。
static void* ConcurrentAlloc(size_t size)
{
if (size > MAX_BYTES) //大于256KB的内存申请
{
//计算出对齐后需要申请的页数
size_t alignSize = SizeClass::RoundUp(size);
size_t kPage = alignSize >> PAGE_SHIFT;
//向page cache申请kPage页的span
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kPage);
PageCache::GetInstance()->_pageMtx.unlock();
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else
{
//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
}
b、释放过程
当我们要释放对象时,我们需要对对象的大小进行判断。
根据不同的对象大小,我们需要将对象释放给不同的缓存。
因此当释放对象时,我们需要先找到对象对应的span,但是在释放对象时我们只知道该对象的起始地址,所以当我们申请大于256KB的内存时,也要给申请的内存建立span结构,并且建立起始页号与该span之间的映射关系。此时我们就可以根据释放对象的起始地址计算起始页号,进而通过页号找到该对象的span。
六、释放对象时优化为不传对象大小
当我们使用malloc函数申请内存时,需要指明申请内存的大小;当我们使用free函数释放内存时,我们只需要传入这块内存的指针即可。但是我们目前实现的是,在释放的时候还需要传入该对象的大小。
因为:
- 如果我们释放的是大于256KB的对象,我们需要根据对象的大小来判断这块内存到底是应该还给page cache还是直接还给堆?
- 如果我们释放的是小于等于256KB的对象,需要根据对象的大小来计算出应该还给thread cache中的哪一个哈希桶。
如果我们想做到在释放对象时不用传入对象的大小,那么我们就需要建立对象地址和对象大小的映射关系。现在我们可以通过对象的地址找到其对应的span,而span的自由链表中挂的都是相同大小的对象。因此我们可以在Span的结构中增加一个_objSize成员,该成员是用来代表管理这个span的内存块被切成一个一个对象的大小。
//管理以页为单位的大块内存
struct Span
{
PAGE_ID _pageId = 0; //大块内存起始页的页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双链表结构
Span* _prev = nullptr;
size_t _objSize = 0; //切好的小对象的大小
size_t _useCount = 0; //切好的小块内存,被分配给thread cache的计数
void* _freeList = nullptr; //切好的小块内存的自由链表
bool _isUse = false; //是否在被使用
};
所有的span都是从pagecache中拿出来的,因此每当我们调用New Span获取一个K页的span时,就应该将这个span的_objSize保存下来。
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_objSize = size;
七、性能瓶颈分析及解决
a、瓶颈分析
我们通过VS编译器中自带的性能分析工具对该程序进行分析,分析后发现Deallocate和MapObjectToSpan这两个函数就占用了一半多的时间,我们再具体分析,发现Deallocate函数中,调用ListTooLong函数时消耗的时间是最多的,在ListTooLong函数中,调用ReleaseListToSpans函数时消耗的时间是最多的,ReleaseListToSpans函数中,调用MapObjectToSpan函数时消耗的时间是最多的,通过观察,我们发现当我们调用该函数时因为锁的原因增加了消耗时间。针对锁的原因,我们使用基数树进行优化
b、使用基数树进行优化
基数树实际就是一个分层的哈希表,根据所分层层数的不同分为单层基数树、二层基数树、三层基数树等。
单层基数树
单层基数树是采用直接定址法,每一个页号对应span的地址就是存储数组中在以该页号为下标的位置。
二层基数树
三层基数树
为什么读取基数树映射关系时不需要加锁?
当某个线程在读取映射关系时,可能另外一个线程正在建立其他页号的映射关系,而此时无论我们用的是C++当中的map还是unordered_map,在读取映射关系时都是需要加锁的。
因为C++中的map的底层数据结构是红黑树,unordered_map的底层数据结构是哈希表,而无论是红黑树还是哈希表,当我们在插入数据时其底层的数据都会发生变化。比如红黑树在插入数据时会引起树的旋转,而哈希表在插入数据时可能会引起哈希表扩容。此时要避免出现数据不一致的问题,就不能让插入操作和读取操作同时进行,因此我们在读取映射关系的时候是需要加锁的。
而对基数树来说就不一样的,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定位置进行读取的。并且我们不会同时对一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射,而建立映射的操作都是在page cache进行的,也就是说,读取映射时读取的都是对应的span的_useCount不等于0的页,而建立映射时建立的都是对应span的_useCount等于0的页,所有我们在基数树中不会同时对同一个页进行读取映射和建立映射的操作。