栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

C++并行多线程归并排序

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

C++并行多线程归并排序

 下面是源代码

template
void F(Iterator start, Iterator end) {
	std::sort(start, end);
}
template 
void merge(Iterator start, int length, int size)
{
	for (int i = 1; i < size; i++)
		std::inplace_merge(start, start + length * i, start + length * (i + 1));
}
template
void Sort_(Iterator start, Iterator end) {
	const size_t length = std::distance(start, end);
	size_t max_thread = std::min(length / 100, std::thread::hardware_concurrency());
	if (!max_thread)
		max_thread = 1;
	const size_t size = length / max_thread;
	std::vectorthreads;
	Iterator block_start = start;

	for (size_t i = 0; i < (max_thread - 1); i++) {
		Iterator block_end = block_start;
		std::advance(block_end, size);
		threads.emplace_back(F, block_start, block_end);
		block_start = block_end;
	}
	F(block_start, end);
	for (auto& i : threads)
		i.join();
	merge(start, size, max_thread);
}

 一直看书啥的最近人都有点迷糊,我们直接看代码吧,逻辑还是很简单

1、编写一个模板函数,调用algorithm的std::sort进行排序,其参数是两个迭代器,我们调用sort只是为了方便,想要自己写一个归并排序自然也可以,但是没有必要

2、写一个合并算法,因为我们想的是把一段很长的序列分成一段一段小的序列,用线程调用进行排序,那么最后线程排序完后就是一段一段有序的序列,所以需要一个模板函数进行合并。我们调用了algorithm的std::inplace_merge进行合并,函数的参数是:开始迭代器,每一项有序子序列的元素个数,有序子序列的数量,然后进行一个循环即可完成重复调用合并。std::inplace_merge基础使用方式如下

#include     
#include   
int main() {
    //该数组中存储有 2 个有序序列
    int first[] = { 5,10,15,20,25,7,17,27,37,47,57 };
    //将 [first,first+5) 和 [first+5,first+11) 合并为 1 个有序序列。
    std::inplace_merge(first, first + 5, first + 11);
    for (int i = 0; i < 11; i++) {
        std::cout << first[i] << " ";
    }
    std::cout << std::endl;
    int first_[]{ 57,47,37,27,17,10,9,8,7,6,5,4 };
    std::inplace_merge(first_, first_ + 5, first_ + 11, [](auto a, auto b) {return a > b; });
    for (auto i : first_)
        std::cout << i << ' ';
    return 0;
}
//只能处理有序序列,且有序是指如果要按照从小到大,那么这个数组的两个序列必须也是分别从小到大

3、进入正题,Sort_函数模板接收两个迭代器作为参数,调用std::distance算法计算两个迭代器的距离。max_thread为最大线程数,我们取距离即元素个数/100,和设备支持的最大线程数的其中更小的,线程并不是越多越好,也有开销,而且不能超过设备所支持的最大数。

if这里为了防止元素个数不足100,导致为0的情况,所以赋值1

4、size = length / max_thread,size是单个线程一次处理的元素数量,也就是长度/线程数

std::vectorthreads;
    Iterator block_start = start;

创建线程容器与开始迭代器,开始迭代器初始化为传入参数的start。

5、

for (size_t i = 0; i < (max_thread - 1); i++) {
        Iterator block_end = block_start;
        std::advance(block_end, size);
        threads.emplace_back(F, block_start, block_end);
        block_start = block_end;
    }

循环,先创建结束迭代器让开始迭代器进行初始化,然后使用std::advance对迭代器进行移动,移动的距离就是size,即线程一次处理的范围,vector容器进行匿名构造thread插入。

最后开始迭代器被赋值为结束迭代器,以此类推循环。

6、循环条件之所以是最大线程数-1是因为主线程也是线程,也进行计算。循环结束后主线程调用F进行排序:F(block_start, end);,即开始迭代器和末尾,也就是把最后一部分数据留给主线程计算,到此整个序列全部变为一个一个有序的子序列,此时就可以调用join成员函数确保其他线程正常退出,然后调用函数进行合并

for (auto& i : threads)
        i.join();
 merge(start, size, max_thread);

如此便完成了

测试时间效率代码

class auto_timer {
	std::chrono::system_clock::time_point start;

public:
	// start record when entering scope
	explicit auto_timer(const char* task_name = nullptr) {
		if (task_name) std::cout << task_name << " running , ";
		start = std::chrono::system_clock::now();
	}

	// end record when leaving scope
	~auto_timer() {
		auto cost = std::chrono::duration_cast(std::chrono::system_clock::now() - start);
		std::cout << "cost: " << double(cost.count()) / 1000000.0 << " ms" << std::endl;
	}
};

加上对照打印测试代码,如下:

#include
#include
#include
#include
#include
#include 
class auto_timer {
	std::chrono::system_clock::time_point start;

public:
	// start record when entering scope
	explicit auto_timer(const char* task_name = nullptr) {
		if (task_name) std::cout << task_name << " running , ";
		start = std::chrono::system_clock::now();
	}

	// end record when leaving scope
	~auto_timer() {
		auto cost = std::chrono::duration_cast(std::chrono::system_clock::now() - start);
		std::cout << "cost: " << double(cost.count()) / 1000000.0 << " ms" << std::endl;
	}
};

template
void F(Iterator start, Iterator end) {
	std::sort(start, end);
}
template 
void merge(Iterator start, int length, int size)
{
	for (int i = 1; i < size; i++)
		std::inplace_merge(start, start + length * i, start + length * (i + 1));
}
template
void Sort_(Iterator start, Iterator end) {
	const size_t length = std::distance(start, end);
	size_t max_thread = std::min(length / 100, std::thread::hardware_concurrency());
	if (!max_thread)
		max_thread = 1;
	const size_t size = length / max_thread;
	std::vectorthreads;
	Iterator block_start = start;

	for (size_t i = 0; i < (max_thread - 1); i++) {
		Iterator block_end = block_start;
		std::advance(block_end, size);
		threads.emplace_back(F, block_start, block_end);
		block_start = block_end;
	}
	F(block_start, end);
	for (auto& i : threads)
		i.join();
	merge(start, size, max_thread);
}
template
void Sort(std::vector& data)
{
	const intptr_t size = data.size();
	intptr_t stride = 2048;

	//从stride = 1开始归并排序非常缓慢
	//因此这里对data进行初排序
	//从一个较大的stride开始归并排序
	if (stride != 1) {
#pragma omp parallel for schedule(dynamic, 2048 / stride + 1)
		for (intptr_t i = 0; i < size; i += stride) {
			auto left = data.begin() + i;
			auto right = i + stride < size ? data.begin() + i + stride : data.end();
			std::sort(left, right);
		}
	}

	//并行归并排序
#pragma omp parallel
	{
		intptr_t _stride = stride;
		do
		{
			_stride *= 2;

#pragma omp for schedule(dynamic, 2048 / _stride + 1)
			for (intptr_t i = 0; i < size; i += _stride) {
				//对[i, i+_stride/2)和[i+_stride/2, i+_stride)进行归并
				auto left = data.begin() + i;
				auto mid = (i + i + _stride) / 2 < size ? data.begin() + (i + i + _stride) / 2 : data.end();
				auto right = i + _stride < size ? data.begin() + i + _stride : data.end();
				inplace_merge(left, mid, right);
			}
		} while (_stride < size);
	}
}
int main() {
	srand(time(0));
	std::vectorV;
	for (int i = 10000000; i > 0; i--)
		V.push_back(rand());
	
		//std::cout << std::endl;

	{
		auto_timer timer("并行排序");
		Sort_(V.begin(), V.end());
	}

	std::vectorV2;
	for (int i = 10000000; i > 0; i--)
		V2.push_back(rand());
	{
		auto_timer timer("库排序");
		std::sort(V2.begin(), V2.end());
	}
	std::vectorV3;
	for (int i = 10000000; i > 0; i--)
		V3.push_back(rand());
	{
		auto_timer timer("并行归并");
		Sort(V3);
	}
	std::vectorV4{ 10,9,8,7,6,5,4,3,2,1 };
	{
		{
			auto_timer timer("并行排序");
			Sort_(V4.begin(), V4.end());
			for (auto i : V4)
				std::cout << i << ' ';
		}
	}
	
}

其中一个归并并行的代码是复制的网上的做对照使用

顺带提一句测试时间的使用方式

{
     auto_timer timer("任务名");
    // 耗时代码

}

如有错误还请指正,MSVC,C++20,release

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/846871.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号