栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

【C++】从零实现一个高并发内存池

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【C++】从零实现一个高并发内存池

从零实现一个高并发内存池
  • 项目介绍
    • ◎项目的内容介绍
    • ◎要求的知识储备
    • ◎内存池的介绍
      • 1、池化技术
      • 2、内存池
      • 3、内存池主要解决的问题
      • 4、malloc解析
  • 设计思路
    • ◎第一阶段--设计一个定长的内存池
      • 适应平台的指针方案
    • ◎第二阶段--高并发内存池整体框架设计
      • 1.线程缓存(thread cache)
      • 2.中心缓存(central cache)
      • 3.页缓存(page cache)
    • ◎第三阶段--三级缓存的具体实现
      • 1.Thread Cache框架构建及核心实现
        • 申请与释放内存的规则及无锁访问
        • 管理内存对齐和映射等关系
          • ▶计算对齐大小映射的规则
          • ▶计算相应内存映射在哪一个哈希桶中
          • ▶代码实现
        • 自由链表的设计
        • thread cache框架构建
        • thread cache核心实现
      • 2.central cache框架构建及核心实现
        • 申请与释放内存规则
          • ▶慢开始算法
        • 管理多个大块内存的跨度结构Span及SpanList定义
        • central cache框架构建
        • central cache核心实现
      • 3.page cache框架构建及核心实现
        • 申请与释放内存
          • ▶直接向堆申请或释放以页为单位的大块内存
          • ▶Span跨度结构以页为单位管理从堆申请的内存
        • page cache框架构建
        • page cache核心实现
  • 细节与性能优化
    • ◎使用定长内存池配合脱离使用new
    • ◎解决内存大于256kb的申请释放问题
    • ◎使用基数树进行性能优化
  • 项目总结
    • ◎结果演示
      • ▶项目源码
    • ◎项目对比malloc性能高的原因
    • ◎项目扩展及缺陷
    • ◎收获与总结

项目介绍 ◎项目的内容介绍

这个项目是实现一个高并发内存池,参考了google的开源项目tcmalloc实现的简易版;其功能就是实现高效的多线程内存管理。由功能可知,高并发指的是高效的多线程,而内存池则是实现内存管理的,关于内存池接下来会进行详细介绍
tcmalloc源码

◎要求的知识储备

实现这个项目要求掌握C/C++、数据结构(链表与哈希桶)、操作系统的内存管理、单例模式、多线程以及互斥锁等方面的知识。

◎内存池的介绍 1、池化技术

我们知道,向系统申请和释放资源都有较大的开销,而池化技术就是程序先向系统申请过量的资源,而这些资源由我们自己管理,这样就能避免频繁的申请和释放资源导致的开销。
其实,在计算机中,除了我们上面所说的内存池,还有我们之前提到过的数据结构池,以及线程池、对象池、连接池等等,都利用了池化技术。

2、内存池

内存池指的是程序预先向操作系统申请足够大的一块内存空间;此后,程序中需要申请内存时,不需要直接向操作系统申请,而是直接从内存池中获取;同理,程序释放内存时,也不是将内存直接还给操作系统,而是将内存归还给内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。

3、内存池主要解决的问题

由上可知,内存池首要解决的是效率问题,其次从系统的内存分配器角度出发,还需要解决内存碎片的问题。那么什么是内存碎片问题呢?
内存碎片分为外碎片和内碎片。

  • 外碎片由下图所示:对于我们申请的内存,可能因为频繁的申请和释放内存导致内存空间不连续,那么就会出现明明由足够大的内存空间,但是我们却申请不出连续的空间出来,这便是外碎片问题了。

  • 内碎片则是由于一些对齐的需求,导致分配出去的内存空间无法被利用,关于内碎片我们项目后面中会涉及,到时候会有更准确的理解。

4、malloc解析

我们在C语言中动态申请内存时,是通过malloc函数去申请内存的,但是实际上malloc并不是直接向堆申请内存的,而malloc本质上就是一个内存池,即向操作系统申请一大块内存,当程序将malloc管理的内存池中内存全部申请完后或者有大量内存需求时,malloc函数就会继续向操作系统申请空间。

设计思路 ◎第一阶段–设计一个定长的内存池

我们直到malloc函数申请内存空间是通用的,即任何场景下都可以使用,但是各方面都会就表示各方面都不顶尖,那么我们可以设计一个定长内存池来保证特定场景下的内存申请效率要高于malloc函数。

适应平台的指针方案

在这里,我们想取出一块对象内存中的前4个字节(32位系统)或者8个字节(64位系统)的内存来存储一个指针指向下一块释放回来的自由对象内存,那么在这里为了不作平台系统的判断,可以使用一个小技巧,即将对象内存强转成void**的类型,那么再对这个二级指针类型解引用就可以取出当前对象的前4个字节(32位系统)或8个字节(64位系统)。
由于这个操作之后会频繁使用,因此定义为内敛函数放在common.h头文件中方便调用:

static inline void*& NextObj(void* obj)
{
	return *(void**)obj;
}

由此,我们就可以设计出定长内存池的对象:

template //一次申请T大小的内存空间
class ObjectPool
{
public:
	T* New()
	{
		T* obj;
		//若自由链表有对象,则直接从自由链表中取
		if (_freeList != nullptr)
		{
			//头删
			obj = (T*)_freeList;
			_freeList = *(void**)_freeList;
		}
		else
		{
			if (_remainedBytes < sizeof(T))//当前内存池中没有足以分配的内存,需要申请
			{
				_remainedBytes = 8 * 1024;	
				//_memory = (char*)malloc(_remainedBytes);//申请定长(8Kb)的内存空间
				_memory = (char*)SystemAlloc(_remainedBytes >> PAGE_SHIFT);//申请定长(8Kb)的内存空间

			}
			//保证一次分配的空间够存放下当前平台的指针
			size_t unity = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			obj = (T*)_memory;
			_memory += unity;
			_remainedBytes -= unity;
		}
		//定位new显式调用T类型构造函数
		new(obj)T;
		return obj;
	}
	void Delete(T* obj)//将obj这块内存链接到_freeList中
	{
		//显式调用obj对象的析构函数,清理空间
		obj->~T();
		//将obj内存块头插
		*(void**)obj = _freeList;
		_freeList = obj;
	}
private:
	char* _memory = nullptr;//存储一次申请一大块的内存,char*类型便于分配内存
	void* _freeList = nullptr;//将释放的内存回收链接
	size_t _remainedBytes;//_memory中剩余的内存空间
};

对于我们设计的定长内存池,我们可以通过下面的测试代码来比较一下malloc与定长内存池的效率:

void TestObjectPool()
{
	// 申请释放的轮次
	const size_t Rounds = 5;
	// 每轮申请释放多少次
	const size_t N = 1000000;
	size_t begin1 = clock();
	std::vector v1;
	v1.reserve(N);
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v1.push_back(new TreeNode);
		}
		for (int i = 0; i < N; ++i)
		{
			delete v1[i];
		}
		v1.clear();
	}
	size_t end1 = clock();
	ObjectPool TNPool;
	size_t begin2 = clock();
	std::vector v2;
	v2.reserve(N);
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v2.push_back(TNPool.New());
		}
		for (int i = 0; i < 100000; ++i)
		{
			TNPool.Delete(v2[i]);
		}
		v2.clear();
	}
	size_t end2 = clock();
	cout << "new cost time:" << end1 - begin1 << endl;
	cout << "object pool cost time:" << end2 - begin2 << endl;
}


可以明显的看出,定长内存池的开销是要低于malloc的,由此可见,在特定场景下,定长内存池的效率高于malloc函数。

◎第二阶段–高并发内存池整体框架设计

随着开发环境逐渐多核多线程,那么在申请内存的场景下,必然存在激烈的锁竞争问题。其实,malloc本身就已经足够优秀了,但我们项目的原型tcmalloc将在多线程高并发的场景下更胜一筹。而我们这次实现的内存池将考虑以下几方面的问题:

  • 1.性能问题
  • 2.多线程场景下的锁竞争问题
  • 3.内存碎片问题
    concurrent memory pool(并发内存池),主要有以下3个部分组成:
1.线程缓存(thread cache)

线程缓存是每个线程独有的,用于小于256kb内存的分配。那么对于每一个线程从thread cache申请资源,就无需考虑加锁问题,每个线程独享一个缓存(cache),这也是并发线程池高效的地方。

2.中心缓存(central cache)

中心缓存有所有线程所共享,thread cache 按需从central cache处获取对象,而central cache在合适的时机从thread cache处回收对象从而避免一个线程占用太多资源,导致其他线程资源吃紧,进而实现内存分配在多个线程更加均衡的按需调度。由于所有thread cache都从一个central cache中取内存对象,故central cache是存在竞争的,也就是说从central cache中取内存对象需要加锁,但我们在central cache这里用的是桶锁,且只有thread cache中没有对象后才会来central cache处取对象,因此锁的竞争不会很激烈。

3.页缓存(page cache)

页缓存是中心缓存上一级的缓存,存储并分配以页为单位的内存,central cache中没有内存对象时,会从page cache中分配出一定数量的page,并切割成定长大小的小块内存,给central cache。当page cache中一个span的几个跨度页都回收以后,page cache会回收central cache中满足条件的span对象,并且合并相邻的页,组成更大的页,从而缓解内存碎片(外碎片)的问题。

◎第三阶段–三级缓存的具体实现 1.Thread Cache框架构建及核心实现

thread cache是哈希桶结构,每个桶是一个根据桶位置映射的挂接内存块的自由链表,每个线程都会有一个thread cache对象,这样就可以保证线程在申请和释放对象时是无锁访问的。

申请与释放内存的规则及无锁访问
  • 申请内存
  1. 当内存申请大小size不超过256KB,则先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
  2. 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
  3. 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
  • 释放内存
    1.当释放内存小于256kb时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
    2.当链表的长度过长,则回收一部分内存对象到central cache。
  • tls - thread local storage
    线程局部存储(tls),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
//TLS: thread local storage,实现线程的无锁访问
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
管理内存对齐和映射等关系 ▶计算对齐大小映射的规则

thread cache中的哈希桶映射比例比非均匀的,如果将内存大小均匀划分的话,则会划分出大量的哈希桶,比如256kb如果按照8byte划分,则会创建32768个哈希桶,这就有较大的内存开销;而如果按照更大的字节划分,那么内存开销虽然减少了,但照顾到的场景也少了,且会产生内碎片问题。
那么参考tcmalloc项目,为了保证内碎片的浪费整体控制在10%左右进行的区间映射,同时没有那么大的开销,对于内存的哈希映射关系如下:

	// 整体控制在最多10%左右的内碎片浪费
	// [1,128] 8byte对齐       freelist[0,16)
	// [128+1,1024] 16byte对齐   freelist[16,72)
	// [1024+1,8*1024] 128byte对齐   freelist[72,128)
	// [8*1024+1,64*1024] 1024byte对齐     freelist[128,184)
	// [64*1024+1,256*1024] 8*1024byte对齐   freelist[184,208)

也就是说,对于1byte到128byte的内存对象,按照8byte对齐,划分为下标0-15号的哈希桶,而129byte到1kb的内存对象,按照16byte对齐,划分下标16-71号的哈希桶,以此类推,最终划分为0-207号总共208个哈希桶,这样就保证了内存较小的开销,同时各个对齐关系中内碎片浪费控制在10%左右,比如129byte到144byte区间,取144byte的内存对象,浪费率为(144 - 129) / 144 = 10.42%,当然对于最开始的1byte申请8byte内存对象,虽然浪费高达87.5%,但考虑到最终内碎片浪费了7byte,对比后续内碎片一次浪费7kb来说可以忽略不计了。
这便是申请的内存对象大小对齐的映射关系,这个关系在后续central cache及page cache中仍有应用,因此可以将其定义在头文件common.h中,以后内存大小对齐的管理。

▶计算相应内存映射在哪一个哈希桶中

既然我们已经有了对齐内存的映射关系,那么我们需要通过这个映射关系去计算相应内存对象挂接在哪一个哈希桶中,在这里可以直接使用相关映射关系去进行加减乘除运算:

	//计算对应链桶的下标
	//static inline size_t _Index(size_t bytes, size_t alignNum)
	//{
	//	if (bytes % alignNum == 0)
	//	{
	//		return bytes / alignNum - 1;
	//	}
	//	else
	//	{
	//		return bytes / alignNum;
	//	}
	//}

但是参考tcmalloc源码,考虑到位移运算更加接近底层,效率更高,而实际应用中映射对应关系的计算是相当频繁的,因此使用位运算来改进算法。

▶代码实现
//管理对齐和映射关系
class SizeClass
{
public:
	// 整体控制在最多10%左右的内碎片浪费
	// [1,128] 8byte对齐       freelist[0,16)
	// [128+1,1024] 16byte对齐   freelist[16,72)
	// [1024+1,8*1024] 128byte对齐   freelist[72,128)
	// [8*1024+1,64*1024] 1024byte对齐     freelist[128,184)
	// [64*1024+1,256*1024] 8*1024byte对齐   freelist[184,208)

	static inline size_t _RoundUp(size_t bytes, size_t alignNum)
	{
		return (bytes + (alignNum - 1) & ~(alignNum - 1));//移位计算提高效率
	}

	//对齐大小的计算
	static inline size_t RoundUp(size_t size)//给定申请的内存大小,计算其对齐数,对齐规则如上
	{
		if (size <= 128)
		{
			return _RoundUp(size, 8);//按8byte对齐
		}
		else if(size <= 1024)
		{ 
			return _RoundUp(size, 16);//按16byte对齐
		}
		else if (size <= 8 * 1024)
		{
			return _RoundUp(size, 128);//按128byte对齐
		}
		else if(size <= 64 * 1024)
		{
			return _RoundUp(size, 1024);//按1024byte对齐
		}
		else if (size <= 256 * 1024)
		{
			return _RoundUp(size, 8 * 1024);//按8*1024byte对齐
		}
		else//大于256KB
		{
			//assert(false);
			return _RoundUp(size, 1 << PAGE_SHIFT);
		}
	}
	
	//计算在对应链桶中的下标
	static inline size_t _Index(size_t bytes, size_t align_shift)
	{
		return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
	}

	//计算映射到哪一个自由链表桶
	static inline size_t Index(size_t size)
	{
		assert(size <= MAX_BYTE);
		static int group_array[4] = { 16,56,56,56 };//打表,每个区间有多少个链
		if (size <= 128)
		{
			return _Index(size, 3);
		}
		else if (size <= 1024)
		{
			return _Index(size - 128, 4) + group_array[0];
		}
		else if(size <= 8 * 1024)
		{
			return _Index(size - 1024, 7) + group_array[1] + group_array[0];
		}
		else if (size <= 64 * 1024)
		{
			return _Index(size - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
		}
		else if (size <= 256 * 1024)
		{
			return _Index(size - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
		}
		else
		{
			assert(false);
		}
	}

	//计算向CentralCache一次获取批次的数量
	static size_t NumMoveSize(size_t size)
	{
		assert(size > 0);//获取的内存对象总大小必须大于0
		size_t num = MAX_BYTE / size;//计算合理的内存对象数量
		//根据批次计算获取的数量会在[1,32768],范围过大,
		//因此控制获取的对象数量范围在区间[2, 512],较为合理
		//小对象获取的批次多,大对象获取的批次少
		if (num < 2)
			num = 2;
		if (num > 512)
			num = 512;
		return num;
	}

	//计算一次向PageCache申请多少页
	//size: 单个对象从8byte - 256kb
	static size_t NumMovePage(size_t size)
	{
		size_t num = NumMoveSize(size);//当前size所需批次数量
		size_t nPage = num * size;
		nPage >>= PAGE_SHIFT;
		if (nPage == 0)
			nPage = 1;
		return nPage;
	}
};
自由链表的设计

在有了上面的基础之后,我们来设计自由链表,其实就是实现一个单链表,方便插入删除,同时标识链表长度_size以方便后续释放流程,以及定义_maxSize来保住thread cache一次申请对象批次的下限。

class FreeList
{
public:
	void Push(void* obj)
	{
		//将归还的内存块对象头插进自由链表
		NextObj(obj) = _freeList;
		_freeList = obj;
		++_size;
	}	
	void PushRange(void* start, void* end, size_t size)
	{
		NextObj(end) = _freeList;
		_freeList = start;
		_size += size;
	}
	void* Pop()
	{
		assert(_freeList);
		//将自由链表中的内存块头删出去
		void* obj = _freeList;
		_freeList = NextObj(obj);
		--_size;
		return obj;
	}

	void PopRange(void*& start, void*& end, size_t n)
	{
		assert(n >= _size);
		start = _freeList;
		end = start;
		for (size_t i = 0; i < n - 1; i++)
		{
			end = NextObj(end);
		}
		_freeList = NextObj(end);
		_size -= n;
		NextObj(end) = nullptr;
	}

	bool Empty()
	{
		return _freeList == nullptr;
	}
	size_t& MaxSize()
	{
		return _maxSize;
	}
	size_t& Size()
	{
		return _size;
	}
private:
	void* _freeList = nullptr;
	size_t _maxSize = 1;//慢增长用于保住申请批次下限
	size_t _size = 0;//计算链表长度
};
thread cache框架构建

在有了上述基础后,我们来搭建thread cache的框架,其实就是一个哈希桶,每个桶中挂接着自由链表对象。
不过在thread cache中采用tls(thread local storage)即线程局部存储技术,来保证一个线程仅拥有一个全局的thread cache对象,从而可以使线程在向thread cache申请内存对象的时候实现无锁访问。

class ThreadCache
{
public:
	//申请和释放内存对象
	void* Allocate(size_t size);//从Thread Cache中申请size个Bytes的空间
	void DeAllocate(void* ptr, size_t size);

	void* FetchFromCentralCache(size_t index, size_t size);//向CentralCache中申请空间
	void ListTooLong(FreeList& list, size_t size);//释放对象导致链表过长时,归还内存到CentralCache
private:
	FreeList _freeLists[NFREELIST];//ThreadCache的自由链表
};
//TLS: thread local storage,实现线程的无锁访问
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
thread cache核心实现

代码部分:thread cache核心方法实现

2.central cache框架构建及核心实现

central cache也是一个哈希表结构,其映射关系与thread cache是一样的,不同的是central cache的哈希桶位置所挂接的是SpanList链表结构,不过每个桶下的span对象被切分成了一块块小内存挂接在span对象的自由链表freeList中。

申请与释放内存规则
  • 申请内存
    1.当thread cache中没有内存后,就会向central cache中申请大块内存。这里的申请过程采用的是类似网络tcp协议拥塞控制的慢开始算法,而central cache中哈希映射的spanlist下挂着的span则向thread cache提供大块内存,而从span中取出对象给thread cache是需要加锁的,这里为了保证效率,提供的是桶锁。
▶慢开始算法
	//预计获取的批次数量
	//慢开始反馈调节算法
	//1、刚开始向CentralCache获取的批次数量较小,申请太多可能用不完
	//2、当size需求逐渐增大时,batchNum就不断增大,直到上限
	//3、size越小,一次向CentralCache申请的批次数量就较大
	//4、size越大,一次向CentralCache申请的批次数量就较小
	size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
	if (_freeLists[index].MaxSize() == batchNum)
		_freeLists[index].MaxSize() += 1;

举个例子,线程申请7块大小相同的内存,第一次申请的批次数量为1块,第二次再来申请时,此时thread cache的自由链表下已经没有空闲的内存了,则又需要继续向central cache申请内存,申请的批次数量为2块,第3次直接从thread cache的自由链表中获取内存块;第4次再申请时,则需要向central cache申请内存,此时申请的批次数量为3块,挂接在thread cache的自由链表下,直到第7次来申请内存时,向central cache申请的内存批次数量为4,这时线程取走一块内存,则挂接在thread cache的自由链表下还有3块空闲的内存。

2.当central cache中映射的spanlist下所挂接的所有span对象都没有内存后,则需要向page cache申请一块新的span对象,central cache拿到这块span对象后会按照所管理内存的大小将span对象划分成多块,再挂接到central cache的审判list中;然后再从这块新申请的span对象中去内存分配给thread cache。
3.在这里,为了方便后续的回收,span对象每分配出一块内存,其成员变量_useCount就++;相反thread cache每释放归还一块内存后,_useCount就–。

  • 释放内存
    当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–_useCount。
    当_useCount变为0后,说明所有分配出去的内存都归还回来了,那么就将这个span对象归还给page cache,page cache会对归还回来的span进行前后页合并。
管理多个大块内存的跨度结构Span及SpanList定义

在上面我们知道central cache的哈希桶下挂接着的是跨度结构Span对象,其实Span对象是管理以页为单位的大块内存的结构体。而为了方便后续回收span对象到page cache,需要将任意位置span对象从spanlist中删除,那么将spanlist定义为一个双向链表更好一些。
由此,span及spanlist的定义如下:

//管理多个大块内存的跨度结构
struct Span
{
	PAGE_ID _pageId = 0;//大块内存起始页的id
	size_t _n = 0;//页的数量

	Span* _prev = nullptr;//双向链表结构
	Span* _next = nullptr;

	size_t _objSize = 0;//记录Span中所切的小块内存的大小

	bool _isUse = false;//标识这块span是否正在被使用

	size_t _useCount = 0;//当前Span已经被分配给Thread Cache的内存数量
	void* _freeList = nullptr;
};

//链接Span的带头双线链表结构
class SpanList
{
public:
	SpanList()
	{
		_head = new Span;
		_head->_next = _head;
		_head->_prev = _head;
	}
	Span* Begin()
	{
		return _head->_next;
	}
	Span* End()
	{
		return _head;
	}
	bool Empty()
	{
		return _head == _head->_next;
	}

	void PushFront(Span* span)
	{
		Insert(_head->_next, span);
	}
	Span* PopFront()
	{
		Span* front = _head->_next;
		Erase(front);
		return front;
	}

	void Insert(Span* pos, Span* newSpan)
	{
		assert(pos);
		assert(newSpan);
		Span* prev = pos->_prev;
		prev->_next = newSpan;
		pos->_prev = newSpan;
		newSpan->_prev = prev;
		newSpan->_next = pos;
	}
	void Erase(Span* pos)
	{
		assert(pos);
		assert(pos != _head);
		Span* prev = pos->_prev;
		Span* next = pos->_next;
		prev->_next = next;
		next->_prev = prev;
	}
private:
	Span* _head;
public:
	std::mutex _mtx;//对每一个哈希桶都加一个锁以保证线程安全
};
central cache框架构建

在明确了span与spanlist的定义描述后,也不能设计出central cache的框架结构,central cache是一个哈希表结构,其映射的是spanlist与哈希桶位置(内存大小)的关系。
其次,在这里我们将central cache设计为单例模式,保证所有线程都只能看到一个central cache,从而提高效率。

class CentralCache
{
public:
	static CentralCache* GetInstance()
	{
		return &_sInst;
	}

	//获取一个非空的Span
	Span* GetOneSpan(SpanList& list, size_t byte_size);

	//从CentralCache获取一定数量的内存对象给ThreadCache
	size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);

	//将一定数量的对象释放到span跨度
	void RealeaseListToSpan(void* start, size_t size);

private:
	SpanList _spanLists[NFREELIST];//CentralCache的Span的分割方式与ThreadCache相同
private:
	CentralCache()
	{}
	CentralCache(const CentralCache&) = delete;//防拷贝
	static CentralCache _sInst;//单例模式
};
central cache核心实现

代码实现:central cache核心方法实现

3.page cache框架构建及核心实现

page cache与前两级缓存略有不同,其映射关系不再是哈希桶位置与自由链表或spanlist的映射,而是页号与spanlist的映射,这里我们设计的是128页的page cache。

申请与释放内存
  • 申请内存
    1.当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是1页page,1页page后面没有挂span,则向后面寻找更大的span,假设在100页page位置找到一个span,则将100页page的span分裂为一个1页page span和一个99页page span。
    2.如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
    3.需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
  • 释放内存
    如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
▶直接向堆申请或释放以页为单位的大块内存

这里我们为了避免使用malloc及free函数接口去向堆申请和释放内存,因此使用系统调用接口直接向堆申请和释放内存。
这里的系统调用接口在window下为VirtualAlloc与VirtualFree系统调用接口;在Linux系统下为mmap与munmap,brk与sbrk两对系统调用接口。

inline static void* SystemAlloc(size_t kPage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kPage << PAGE_SHIFT, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	//Linux下brk mmap等
#endif // _WIN32

	//抛异常
	if (ptr == nullptr)
		throw std::bad_alloc();

	return ptr;
}
▶Span跨度结构以页为单位管理从堆申请的内存

我们向堆申请内存后会返回这块内存的起始地址,那么我们将这个地址看作一个无符号整型,将其除以8*1024作为Span结构的_pageId,再将申请内存时用的页号赋给_n,这里为了方便后续回收分配出去的Span跨度结构,我们使用STL的unordered_map来构建_pageId与Span对象的映射关系。

page cache框架构建

与central cache类似的是,page cache也是单例模式;不过page cache加的不是桶锁,而是整级加的一把大锁,即每次central cache向page cache申请内存时,page cache都要加锁防止出现安全问题。

#include "Common.h"
#include "ObjectPool.h"
#include "PageMap.h"

class PageCache
{
public:
	static PageCache* GetInstance()
	{
		return &_sInst;
	}

	//建立从对象到span*的映射
	Span* MapObjectToSpan(void* obj);

	//获取一个k页的PageCache
	Span* NewPage(size_t k);

	//将CentralCache归还的span链接到PageCache对应的桶中,并且若其前后页有空闲,合并之
	void RealeaseSpanToPageCache(Span* span);

	//与CentralCache不同的是,若此处设置桶锁,会频繁的加锁解锁,故PageCache中整体设置一把锁
	std::mutex _pageMtx;


private:
	PageCache()
	{}
	PageCache(const PageCache&) = delete;

	//定义定长的span内存池以脱离使用new
	ObjectPool _spanPool;
	
	//std::unordered_map _idSpanMap;
	TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;

	static PageCache _sInst;
	SpanList _spanlists[NPAGES];
};
page cache核心实现

代码实现:page cache核心方法实现

细节与性能优化 ◎使用定长内存池配合脱离使用new

我们定义一个Span结构体时是new一个对象,但new的本质是malloc,而我们这个项目是不依赖malloc的,因此我们可以运用我们自己实现的定长内存池来脱离new的使用。
对于Page Cache,由于要多次定义Span结构,因此我们定义一个特化Span对象的定长内存池:

	//定义定长的span内存池以脱离使用new
	ObjectPool _spanPool;

而对于Thread Cache,由于要保证对于线程而言,全局只有唯一一个Thread Cache对象,故在头文件内定义为静态变量的定长内存池:

		//静态成员,保证全局只有一个对象
		static ObjectPool tcPool;
		//pTLSThreadCache = new ThreadCache;
		pTLSThreadCache = tcPool.New();
◎解决内存大于256kb的申请释放问题

对于线程申请大于256kb内存的情况, 直接向堆申请即可:

	//若申请的内存大于256KB,则直接跳过CentralCache直接向Page Cache申请内存
	if (size > MAX_BYTE)
	{
		//先计算要申请的内存所对齐的大小
		size_t alignSize = SizeClass::RoundUp(size);//这里alignSize按size的对齐数向上对齐
		size_t kPage = alignSize >> PAGE_SHIFT;
		//向PageCache去申请内存
		PageCache::GetInstance()->_pageMtx.lock();
		Span* span = PageCache::GetInstance()->NewPage(kPage);
		span->_isUse = true;
		span->_objSize = size;
		PageCache::GetInstance()->_pageMtx.unlock();
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		return ptr;
	}

同样的,大于256kb的内存的释放就直接释放给堆即可:

	if (size > MAX_BYTE)
	{
		//找到ptr对应的那块span
		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->RealeaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	//------------------------------------------
	//若释放的span大于256KB,即span的页数大于NPAGES - 1,则直接将内存释放到堆
	if (span->_n > NPAGES - 1)
	{
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		//delete span;
		_spanPool.Delete(span);
		return;
	}
◎使用基数树进行性能优化

如果我们在Page Cache中使用STL的unordered_map容器来构建_pageId与span的映射关系,那么通过测试发现:

实际上我们的项目性能比不过malloc的。这主要是unordered_map是线程不安全的,因此多线程下使用时需要加锁,而大量加锁导致资源的消耗,故使性能降低。
因此,这里参考tcmalloc,使用基数树来进行性能的优化。
tcmalloc设计了三种基数树,即一层、二层与三层的基数树,其中一层和二层的基数树是适配32位系统下的,而三层基数树则是适配64位系统。
这里简单介绍以下一层和二层基数树,三层基数树类似于二层:

基数树相较于哈希桶的优势在于如果要写入_pageId和span
的映射关系的话,并不会像哈希桶可能有结构上的改变,即一个线程在读的时候,另一个线程在写,而是一旦基数树构建好映射关系后,就不会改变其结构,之后只会有读的操作,因此多线程环境下无需加锁,从而减少了资源的消耗,优化了性能。

//单层基数树
template 
class TCMalloc_PageMap1
{
private:
	static const int LENGTH = 1 << BITS;
	void** _array;

public:
	typedef uintptr_t Number;//存储指针的一个无符号整型类型
	explicit TCMalloc_PageMap1()//一次将数组所需空间开好
	{
		//计算数组开辟空间所需的大小
		size_t size = sizeof(void*) << BITS;
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
		//由于要开辟的空间是2M,已经很大了,故直接想堆申请
		_array = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
		memset(_array, 0, size);
	}
	void Set(Number key, void* v)//key的范围是[0, 2^BITS - 1],_pageId
	{
		_array[key] = v;
	}
	void* Get(Number key) const
	{
		if ((key >> BITS) > 0)//若key超出范围或还未被设置,则返回空
		{
			return nullptr;
		}
		return _array[key];
	}
};

// Two-level radix tree
template 
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5;
	static const int ROOT_LENGTH = 1 << ROOT_BITS;

	static const int LEAF_BITS = BITS - ROOT_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Leaf* root_[ROOT_LENGTH];             // Pointers to 32 child nodes
	void* (*allocator_)(size_t);          // Memory allocator

public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap2() {
		//allocator_ = allocator;
		memset(root_, 0, sizeof(root_));

		PreallocateMoreMemory();
	}

	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}

	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		ASSERT(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;

			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_[i1] == NULL) {
				//Leaf* leaf = reinterpret_cast((*allocator_)(sizeof(Leaf)));
				//if (leaf == NULL) return false;
				static ObjectPool	leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();

				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};

// Three-level radix tree
template 
class TCMalloc_PageMap3 {
private:
	// How many bits should we consume at each interior level
	static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;

	// How many bits should we consume at leaf level
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Interior node
	struct Node {
		Node* ptrs[INTERIOR_LENGTH];
	};

	// Leaf node
	struct Leaf {
		void* values[LEAF_LENGTH];
	};

	Node* root_;                          // Root of radix tree
	void* (*allocator_)(size_t);          // Memory allocator

	Node* NewNode() {
		Node* result = reinterpret_cast((*allocator_)(sizeof(Node)));
		if (result != NULL) {
			memset(result, 0, sizeof(*result));
		}
		return result;
	}

public:
	typedef uintptr_t Number;

	explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
		allocator_ = allocator;
		root_ = NewNode();
	}

	void* get(Number k) const {
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 ||
			root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
			return NULL;
		}
		return reinterpret_cast(root_->ptrs[i1]->ptrs[i2])->values[i3];
	}

	void set(Number k, void* v) {
		ASSERT(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
		const Number i3 = k & (LEAF_LENGTH - 1);
		reinterpret_cast(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
	}

	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);

			// Check for overflow
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
				return false;

			// Make 2nd level node if necessary
			if (root_->ptrs[i1] == NULL) {
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}

			// Make leaf node if necessary
			if (root_->ptrs[i1]->ptrs[i2] == NULL) {
				Leaf* leaf = reinterpret_cast((*allocator_)(sizeof(Leaf)));
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast(leaf);
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() {
	}
};
项目总结 ◎结果演示

可以看到通过基数树优化后的高并发内存池在性能上是要优于malloc函数的。

▶项目源码

高并发内存池项目源码

◎项目对比malloc性能高的原因

在对比二者之前,首先要简单了解malloc的底层原理,malloc底层是采用边界标记法将内存划分成很多块,从而对内存的分配与回收进行管理。简单来说,malloc分配内存时会先获取分配区的锁,然后根据申请内存的大小一级一级的去获取内存空间,最后返回,具体的实现可以参考下面这篇文章:
malloc 的底层实现(ptmalloc)
那么我们就可以总结出我们项目效率相对较高的几点原因:

    1. 第一级thread cache通过tls技术实现了无锁访问。
  • 2.第二级central cache加的是桶锁,可以更好的实现多线程的并行。
  • 3.第三级page cache通过基数树优化,有效减少了锁的竞争。
◎项目扩展及缺陷

首先,实际上在释放内存时由thread cache将自由链表对象归还给central cache只使用了链表过长这一个条件,但是实际中这个条件大概率不能恰好达成,那么就会出现thread cache中自由链表挂着许多未被使用的内存块,从而出现了线程结束时可能导致内存泄露的问题。
解决方法就是使用动态tls或者通过回调函数来回收这部分的内存,并且通过申请批次统计内存占有量,保证线程不会过多占有资源。
其次,我们可以将这个项目打成静态库或动态库替换调用系统调用malloc,不同平台替换方式不同。 基于unix的系统上的glibc,使用了weak alias的方式替换。具体来说是因为这些入口函数都被定义成了weak symbols,再加上gcc支持 alias attribute,所以替换就变成了这种通用形式:

void* malloc(size_t size) THROW attribute__ ((alias (tc_malloc)))

因此所有malloc的调用都跳转到了tc_malloc的实现。
有些平台不支持这样的东西,需要使用hook的钩子技术来做。
具体参考:hook

◎收获与总结

通过本次项目的学习,我大致了解到了malloc的底层实现原理,理解了内碎片与外碎片问题,学习了三级缓存自顶向下的设计方案,慢增长的算法实现以及基数树使用减少锁竞争的问题。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/847991.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号