[源码解析] NVIDIA HugeCTR,GPU 版本参数服务器---(7) ---Distributed Hash之前向传播
0x00 摘要0x01 前文回顾0x02 总体逻辑
2.1 注释&思路2.2 总体代码 0x03 配置数据
3.1 CUB函数
3.1.1 cub::DeviceScan::InclusiveSum3.1.2 cub::DeviceSelect::If3.1.3 临时存储 3.2 配置数据 0x04 Lookup操作
4.1 提取数据4.2 查找
4.2.1 查找算子4.2.2 get_insert 4.3 combiner
4.3.1 为何要聚合4.3.2 设计准则4.3.3 Combiner代码
4.3.3.1 例子4.3.3.2 要点4.3.3.3 注释版代码4.3.3.4 并行操作 4.3.4 嵌入表大小 0x05 Reduce Scatter
5.1 背景知识5.2 代码 0x06 Combiner
6.1 AllReduce6.2 Forward Scale 0x07 总结0xEE 个人信息0xFF 参考
0x00 摘要在这系列文章中,我们介绍了 HugeCTR,这是一个面向行业的推荐系统训练框架,针对具有模型并行嵌入和数据并行密集网络的大规模 CTR 模型进行了优化。
其中借鉴了HugeCTR源码阅读 这篇大作,特此感谢。
本系列其他文章如下:
[源码解析] NVIDIA HugeCTR,GPU版本参数服务器 --(1)
[源码解析] NVIDIA HugeCTR,GPU版本参数服务器— (2)
[源码解析] NVIDIA HugeCTR,GPU版本参数服务器—(3)
[源码解析] NVIDIA HugeCTR,GPU版本参数服务器— (4)
[源码解析] NVIDIA HugeCTR,GPU版本参数服务器— (5) 嵌入式hash表
[源码解析] NVIDIA HugeCTR,GPU版本参数服务器— (6) — Distributed hash表
0x01 前文回顾目前为止,逻辑如下:
现在我们知道了DistributedSlotSparseEmbeddingHash的基本结构,接下来看前向传播。
为了更好的说明,我们给出一个实际例子,假定一共有两个slot(User ID 和 Item ID),每个slot内部最长为4个元素,稠密向量长度 embedding_vec_size 是8。下面CSR文件之中,每行是一个slot,所以一共有两个样本,每个样本两行,假定batch size = 2,所以这两个样本一起训练。
* 40,50,10,20 * 30,50,10 * 30,20 * 10 * Will be convert to the form of: * row offset: 0,4,7,9,10 * value: 40,50,10,20,30,50,10,30,20,10
第一个样本包括:
40,50,10,20 # slot 1 30,50,10 # slot 2
第二个样本是
30,20 # slot 1 10 # slot 20x02 总体逻辑
前向传播的总体功能是:Embedded_lookuper负责本地gpu计算和查找嵌入向量,即用户输入->嵌入向量。这里只考虑 *train* 名字的各种变量,忽略 *evalute* 名字的各种变量,即只看训练逻辑。
2.1 注释&思路源码之中的注释如下:
我们翻译梳理逻辑如下 Read data from input_buffers_ -> look up -> write to output_tensors,具体就是:
从input_buffers_读取数据。具体是通过 filter_keys_per_gpu 来完成对 embedding_data_ 的一系列配置。从embedding之中进行 look up,即调用 functors_.forward_per_gpu 从本gpu的hashmap做lookup操作。
由 DistributedSlotSparseEmbeddingHash 的特点我们知道,因为当前gpu对应的数据key都在此gpu,所以此时不需要做节点间通信。这里 hash_tables_[i],hash_table_value_tensors_[i],hash_value_index_tensors_[i] 就是本地第 i 个GPU对应的hashmap组合。embedding_data_.get_value_tensors(is_train)[i] 就是从我们之前提到的GPU sparse input 内部提取的输入训练数据。进行本地规约。 做reduce scatter操作。每个gpu的数据是batch size条,但是每条数据里的每个slot只是一部分key,需要做reduce scatter操作,做完reduce scatter后,数据才是完整的,此时每个gpu上分到完整数据的一部分。写到output_tensors。
具体一些成员变量的定义需要回忆一下。
hash_value_index_tensors_ :embedding vector表的row index。就是低维矩阵的 row offset。
需要注意,其类型是 Tensors2,其类型是 std::vector 需要注意,其类型是 Tensors2,其类型是 std::vector 后续我们依然做简化,忽略多个 worker,多个 GPU 的情况。 前向传播总体代码如下: 本地多个GPU并行前向传播,每个线程对应一个GPU,多GPU进行。调用 filter_keys_per_gpu 完成完成了对 EmbeddingData 的配置,这里i就是GPU index,拿到本GPU对应的输入数据。调用 forward_per_gpu 从本gpu的hashmap做lookup操作。reduce scatter,做了之后,数据才是完整的,每个gpu上分到完整数据的一部分。all_reduce 操作,这是combiner=mean时需要继续处理。forward_scale 操作,做平均。
具体流程是: 之前,在EmbeddingData 初始化时候,只是配置了其成员函数 train_keys_,train_keys_ 就是前面提到的 sparse_input,就是CSR format对应的稀疏张量。 此时数据如下, embedding_offsets_ 和 train_output_tensors_ 都是预先分配的,我们假设 CSR 数据为 :40,50,10,20,30,50,10,30,20,CSR row offset 是 0,4,7,9。 我们首先要介绍几个cub库的方法,这是NVIDIA提供的函数库,用来操作CUDA,把一些常见方法用并行化来实现,比如数组求和,不并行计算就是从头查到尾,如果CUDA并行,则可以高速实现。其网址为:https://docs.nvidia.com/cuda/cub/index.html,配置数据中就采用其中了几个方法。 此函数作用是使用GPU来计算inclusive prefix sum。 使用举例如下: 函数实现为: 如果想深入研究,可以参见 https://nvlabs.github.io/cub/structcub_1_1_device_scan.html 。 此函数作用是:使用 select_op 函数,将相应的元素从 d_in 分割到一个分区序列 d_out。被复制到第一个分区的元素总数被写入 d_num_selected_out。 具体使用方法为,此例子中,小于7的放在第一个分区,分区内元素数目为5. 函数实现是 如果想深入研究,参见 https://nvlabs.github.io/cub/structcub_1_1_device_select.html 前面cub方法之中,都要有一个临时存储区域,因此 DistributedSlotSparseEmbeddingHash 之中有一个 DistributedFilterKeyStorage 就是用来达到这个目的。 DistributedFilterKeyStorage 定义如下: 具体构建方法如下: 在前向传播之中,首先就是在 filter_keys_per_gpu 之中使用 train_keys_ 来对其他成员变量进行配置,目的是拿到本GPU上 DistributedSlotSparseEmbeddingHash 对应的输入数据。回忆一下,EmbeddingData 的这几个成员变量 get_output_tensors,get_input_keys,get_row_offsets_tensors,get_value_tensors,get_nnz_array 都返回引用,这说明大部分成员变量都是可以被直接修改的。具体配置代码如下: 配置完成之后,得到如下,其中 train_value_tensors_ 对应了csr value,train_row_offsets_tensors_ 对应了csr row offset,从SparseTensor拷贝到 EmbeddingData。 结合我们例子,最后前向传播输入训练数据是: 此部分就是完成嵌入表 look up操作。现在EmbeddingData得到了各种配置,就是sparse input参数,所以可以利用其作为key,得到embedding vector了。这部分是在 forward_per_gpu 内部完成的。 这里用到了比如 get_row_offsets_tensors 这样的方法从 embedding_data_ 之中提取输入数据。从input_buffers_读取数据对应的提取数据代码如下,就是从GPU的sparse input csr数据中读取到输入数据,作为后续在hash table查找的key: 从 CSR 读取 offset 代码如下: 因为输入有几千万个,但是可能其中只有几百个才非零,所以hash表就是把这几千万个输入做第一次映射,可以减少大量内存空间。 目前代码来到了这里,就是利用哈希表来从输入CSR得到对应的嵌入向量。 forward_per_gpu 分为两部分:查找和内部规约。 forward_per_gpu 函数是用来具体做lookup的。从其注释可以看到其用途,就是我们之前分析过的。 这里的参数都是引用,可以修改外部数据,具体思路是: 首先使用 hash_key value (CSR format of input sparse tensors) 来调用 get_insert 去 hash table 之中查找,如果找到了,得到的就是 hash_value_index。这个value 是 低维 embedding表 的 row index。这部分代码是 hash_table.get_insert 相关。其实,这里没有用到get_insert 返回值,而是把 hash_key value 插进哈希表内部,得到一个映射,具体如何查找是通过 csr row offset完成。 hash_table.get_insert 如果在 hash_table 的内部数据结构之中找到了,就返回,如果没有找到,就插入一个递增的数值,这个数值被设置到 hash_value_index 之中。 然后通过 hash_value_index 作为 index,在 hash_table_value 之中得到最终的 embedding vector,并且先在slot内部做reduce。这部分代码是 forward_sum 和 forward_mean 相关。 所以 hash_table_value_tensors_[i], hash_value_index_tensors_ 这两部分何时设置?其实是在forward_per_gpu完成的,具体逻辑如图: 具体代码是: 算子内部也分为 get_insert 来处理哈希表,和 combiner 处理,我们一一看看。 前面我们分析了哈希表的 get 和 insert 操作,这里是合而为一,就是如果找不到就插入。开始训练时候,不需要给哈希表赋初值,而是在训练过程之中使用get_insert动态插入。 我们再回忆下原理。 比如一共有1亿个单词,40表示第40个单词。如果想表示 10,30,40,50,20在这一亿个单词是有效的,最常见的办法是弄个1亿长度数组,把40,50,20,30,10这5个位置设置为1,其他位置设置为0。对应嵌入矩阵也是一个高维矩阵,比如 1亿 x 64 维度矩阵。 如果想省空间,就弄会构建一个小数据结构(低维矩阵)来存储这些有意义的值,弄一个一个hash函数 m_hf来做"从高维矩阵到低维矩阵的转换",就是10 -->?,20 --> ? 等。 假如是选取十位数数为key,对于我们的例子,就是 1,2,3,4,5 就是内部的hash_value,叫做 hash_value(对应下面代码),对应的内部存储数组叫做 hashtbl_values。但是因为分桶了,所以在哈希表内部是放置在hashtbl_values之中(这里我们做了一个简化,就是 hashtbl_values[i] = i)。 以上说的是哈希表,我们回到 DistributedSlotSparseEmbeddingHash 本身,于是1,2,3 (数组之中的内容,不是数组index,简化成恰好相等)就是DistributedSlotSparseEmbeddingHash 想得到的 10, 20, 30 对应的数据,就是10 放在低维嵌入表第一个位置,20放在低维嵌入表第二个位置,就是就是低维矩阵的row offset。即,hash_value_index 的内容是 [1,2,3,4,5],这些是原始输入数据 10,20,30,40,50 分别在 hash_table_value 之中对应的index,因此,10 对应的就是 hash_table_value[1],20 对应就是 hash_table_value[2],依此类推。 再返回哈希表,NvHashTable 的 get_insert 代码如下。 HashTable 的 get_insert 位于 sparse_operation_kit/kit_cc/kit_cc_infra/src/hashtable/nv_hashtable.cu。这里是在GPU进行并行操作,提取value。 最后还是来到 HugeCTR/include/hashtable/cudf/concurrent_unordered_map.cuh。如果没有value,就生成一个value。 具体逻辑演进如下: 拿到了多个向量之后,需要做聚合,因为此处过于繁琐,因此我们单独拿出来说一下,把它提升到和查找一个级别,大家不要误会。 在CTR领域,人们通常会把多个embedding向量合并成一个向量,这就是pooling。比如用户看了3本艺术书,2本体育书,所以 读书习惯 = 3 * 艺术 + 2 * 体育。这种聚合经常使用加权的pooling,而不是concat。因为虽然concat效果更好,但是pooling更快,而且这样做好处就是即使向量长度不同,也可以生成一个同样长度的新张量。比如:特征的embeddingSize是10,现在所有Field的个数是50,其中5个Field是序列形式的特征(对于序列长度的上限取40)。此时你有两种处理方式: mean/sum pooling :embedding层的参数量是10 * 50 = 500 concat :embedding层的参数量是 10*(50-5) + 40 * 10 * 5 = 2450 如果使用 concat,则embedding层的参数量直接涨了4倍左右,实际ctr模型种参数量最大部分一般就是embedding -> MLP的这一层,所以concat会直接拖慢线上inference的速度。 我们回忆一下前面提到的设计准则:嵌入表可以被分割成多个槽(或feature fields)。为了在不同的嵌入上获得最佳性能,可以选择不同的嵌入层实现。 LocalizedSlotEmbeddingHash:同一个槽(特征域)中的特征会存储在一个GPU中,这就是为什么它被称为“本地化槽”,根据槽的索引号,不同的槽可能存储在不同的GPU中。 DistributedSlotEmbeddingHash:所有特征都存储于不同特征域/槽上,不管槽索引号是多少,这些特征都根据特征的索引号分布到不同的GPU上。这意味着同一插槽中的特征可能存储在不同的 GPU 中,这就是将其称为“分布式插槽”的原因。由于需要全局规约,所以DistributedSlotEmbedding 适合 embedding 大于 GPU 内存大小的情况,因而DistributedSlotEmbedding 在 GPU 之间有更多的内存交换。 一定要注意,LocalizedSlotEmbeddingHash 和 DistributedSlotEmbeddingHash 的区别在于同一个槽(特征域)中的特征 是不是 会存储在同一个GPU中。比如,有 2 张GPU卡,有4个slot。 local模式 :GPU0存slot0和slot1,GPU1存slot2和slot3。distribute模式 :每个GPU都会存所有slot的一部分参数,通过哈希方法决定如何将一个参数分配到哪个GPU上。
在嵌入查找过程中,属于同一槽的稀疏特征输入在分别转换为相应的密集嵌入向量后,被简化为单个嵌入向量。然后,来自不同槽的嵌入向量连接在一起。这个就是前面提到的combiner操作。 现在已经拿到了 embedding table 的 index,需要看看如何拿到 embedding vector,如何仅需操作。 具体是通过 forward_sum 和 forward_mean 完成,我们用 forward_sum 举例看看。 上面代码之中需要注意两个注释 grid_size = batch_size; // each block corresponds to a sampleconst size_t block_size = embedding_vec_size; // each thread corresponds to one element in an embedding vector
4.3.3.1 例子
回忆我们的例子: 第一个样本包括: 第二个样本是 所以,应该得到10个稠密向量,比如40有一个稠密向量,50有一个稠密向量。 怎么知道 40 对应低维嵌入表的哪一行呢?通过一个哈希表来处理的,假如哈希函数是选取十位数为key,则得到: 所以,就知道了,40应该在低维嵌入表的第4行(我们对哈希表做了简化)。 forward_sum_kernel 的代码如下,这里代码很烧脑,需要结合注释仔细分析, 第一个要点是回忆一下hash_value_index_tensors_的使用: 细心读者可能有疑问,如果哈希表能从高维offset映射到低维offset,这个hash_value_index_tensors_ 应该就没有用了吧?这里解释如下: 事实上,因为解耦合的原因,hash_value_index_tensors_ 并不应该知道 哈希表内部把高维矩阵的维度映射了多大的低维矩阵,而 hash_value_index_tensors_ 大小也不应该随之变化。所以,hash_value_index_tensors_ 大小被固定为:batch_size * nnz_per_slot,可以认为就是CSR之中元素个数。所以 hash_value_index_tensors_ 实际上记录了每个元素对应的低维矩阵offset 数值,hash_value_index_tensors_ 事实上就是和CSR之中元素位置一一对应。因此,最终嵌入表查找时候,是通过CSR row offset 来找到 CSR之中每个元素,从而也找到了hash_value_index_tensors_ 这个表的index,从而就能找到其低维矩阵offset。针对我们的例子,hash_value_index_tensors_ 的数值就是 4,5,1,2,3,5,1,3,2,1。
其余几个要点是: bid 是第几个样本。tid 是最终嵌入向量的第几个元素,一个线程处理嵌入向量的一个元素。hash_value_index 是低维嵌入表的offset表的指针。
hash_value_index 是一张表,就是上面说的hash_value_index_tensors_。 row_offset 是CSR offset,例子就是 0,4,7,9,10,所以对于第二个样本,row offset 是 7,9。hash_table_value 可以认为是一个数组,低维嵌入矩阵是存储在这个数组之中。hash_table_value[value_index * embedding_vec_size] 就是对应的稠密向量。
4.3.3.3 注释版代码
关于并行操作,留意点是: bid是第几个样本。 tid 是最终嵌入向量的第几个元素,一个线程处理嵌入向量的一个元素。 hash_table_value[value_index * embedding_vec_size] 就是 CSR user ID对应的稠密向量。 hash_table_value[value_index * embedding_vec_size + tid] 就是 CSR user ID对应的稠密向量的第 tid 个element。 之前说了,应该是两个样本一共10个元素 40,50,10,20,30,50,10,30,20,10,应该对应10个稠密向量。但是在GPU之中会启动tid个线程并行操作,会在一个slot之中进行reduce,然后把结果存入到 embedding_feature 之中。GPU并行体现在同时生成一个稠密向量的所有元素。就是每一个sample生成 slot_num 个稠密向量。稠密向量的每个元素都是根据样本内部元素计算出来的。 比如第一个样本是: slot 1 应该输出 40 对应的稠密向量 + 50 对应的稠密向量 + 10 对应的稠密向量 + 20 对应的稠密向量。 slot 2 应该输出 30 对应的稠密向量 + 50 对应的稠密向量 + 10 对应的稠密向量。 但经过 combiner之后,样本1输出了两个稠密向量,分别对应两个slot,假定每个稠密向量长度是8,计算方式是: 稠密向量1 = 40 对应的稠密向量 + 50 对应的稠密向量 + 10 对应的稠密向量 + 20 对应的稠密向量 稠密向量2 = 30 对应的稠密向量 + 50 对应的稠密向量 + 10 对应的稠密向量 稠密向量1内部8个元素分别是由40,50,10,20对应的稠密向量8个同位置上元素的和构成。即 稠密向量1的[0] = sum(40 对应的稠密向量的[0], 50 对应的稠密向量的[0], 10 对应的稠密向量的[0], 20 对应的稠密向量的[0] )。可以看到,其确实转成了嵌入式向量,但并不是用矩阵乘法,而是用了自己一套机制,具体入下图: 我们已经知道可以通过哈希表来进行缩减嵌入表大小,现在又知道其实还可以通过combine来继续化简,所以在已经有了哈希表基础之上,我们需要先问几个问题。 目前 hash_table_value 究竟有多大?就是权重矩阵(稠密矩阵)究竟多大?embedding_feature (嵌入层前向传播的输出)究竟有多大?就是输出的规约之后的矩阵应该有多大?embedding_feature 的每一个元素是怎么计算出来的?实际矩阵有多大?
我们解答一下。 第一个问题hash_table_value 究竟有多大?
前文之中有分析 hash_table_value 大小是:max_vocabulary_size_per_gpu_ = embedding_data_.embedding_params_.max_vocabulary_size_per_gpu; 实际上,大致可以认为,hash_table_value 的大小是:(value number in CSR) * (embedding_vec_size) 。 hash_table_value 的数值是随机初始化的。每一个原始的 CSR user ID 对应了其中的 embedding_vec_size 个元素。hash_value_index 和 row_offset 凑在一起,就可以找到每一个原始的 CSR user ID 对应了其中的 embedding_vec_size 个元素。 第二个问题:embedding_feature 究竟有多大?就是逻辑上的稠密矩阵究竟有多大?从代码可以看到,
可见,embedding_feature 的大小是:(row number in CSR) * (embedding_vec_size) 。因此,对于 embedding_feature_tensors_,我们抽象一下,输入假设是4行 CSR格式,则输出就是4行稠密向量格式。 第三个问题:embedding_feature 的每一个元素是怎么计算出来的?
是遍历slot和element,进行计算。 第四个问题:实际embedding矩阵,或者说工程上的稠密矩阵有多大?
其实就是 slot_num * embedding_vec_size。row number 其实就是 slot_num。从下面输出可以看到。 以 deep_data 为例,其slot num 是26,embedding_vec_size = 16,最后输出的一条样本大小是 [26 x 16]。 输出: 现在每个GPU之上都得到了自己样本对应的稠密向量,记录在 embedding_feature_tensors_ 之上。每个GPU的数据是 batch size 条,每条有 slot number 个稠密向量,我们现在回忆一下: DistributedSlotEmbeddingHash:所有特征都存储于不同特征域/槽上,不管槽索引号是多少,这些特征都根据特征的索引号分布到不同的GPU上。这意味着同一插槽中的特征可能存储在不同的 GPU 中,这就是将其称为“分布式插槽”的原因。由于需要全局规约,所以DistributedSlotEmbedding 适合 embedding 大于 GPU 内存大小的情况,因而DistributedSlotEmbedding 在 GPU 之间有更多的内存交换。 GPU之上每个样本数据之中的slot只是slot的一部分数据,我们给出一个例子。我们假设一共有2个gpu,batch size为2,一共3个slot。有两个样本,拿第一个样本为例,slot 1有两个key,分别是GPU 1 上的1,GPU 2上的7。所以需要把这两个key进行归并操作。具体如下: 每条数据里面的每个slot都只是一部分key,同一插槽中的特征可能存储在不同的 GPU 中,这些特征都根据特征的索引号分布到不同的GPU上。这样就需要把GPU 1,GPU 2之上的数据进行合并,做完reduce scatter后,数据应该是完整的,并且每个gpu上只分到一部分完整的数据。 关于 Reduce Scatter 的原理,请参见 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/operations.html。这里只是大致介绍。 Reduce操作的作用是对所有计算节点上的数值进行归约操作,并且只将归约后的结果保存到主节点上。和 AllReduce 的操作其实一样,只不过是把结果只放到root上而已。 ReduceScatter 操作执行与Reduce操作相同的操作,只是结果分散在rank之间的相等块中,每个rank根据其rank索引获得一块数据,即每个rank只受到reduce结果的一部分数据。 或者说,ReduceScatter 就是先做Scatter,将数据切分成同等大小的数据块,再依据Rank Index 对每一个rank所获得的数据做Reduce。这类似于全聚集,但是并不是将数据简单拼接到一起而是做了规约操作(比如,求和或最大值操作)。 或者参见下图,来自NVIDIA文档 https://images.nvidia.cn/events/sc15/pdfs/NCCL-Woolley.pdf。对所有GPU上的数据进行reduce操作,这里是sum,然后将结果切分到所有的GPU上。 具体代码如下,是对 embedding_feature_tensors_ 进行 reduce scatter,结果放在 embedding_data_.get_output_tensors(is_train) 之上。 reduce_scatter 算子代码是,这里是sum操作: 我们用图例来展示一下目前过程,为了更好的理解,这里我们可以把Reduce-Scatter分段考虑, Reduce 就是类似AllReduce操作,这个之后,所有GPU之上拥有所有数据。 Scatter 则按照 rank 来对样本进行分配,所以GPU 1 之上是Sample 1,GPU 2之上是Sample 2。 我们最后归纳整体如下: 如果需要做 mean pooling,则需需要做两个操作。 第一个操作是对CSR row offset 做一个AllReduce,这样就相当于是一个全局offset了,就可以拿到每个sample每个slot里的key的总个数。 第二个操作是Forward Scale,就是把embedding的值除以这个"个数",也就等于做了平均。 AllReduce 结果如下: 回忆一下 CSR 例子。 row_offset 的数字就是:第一行起始位置是0,第二行起始位置是4,第三行起始位置是7… 我们假设这是在Node 1之上。 如果Node 2的row_offset为 0,5,7,10,11,说明在这个Node之上,第一行起始位置是0,第二行起始位置是5,第三行起始位置是7…,对应CSR是: 做了AllReduce之后,得到:0,9,14,19,21。这样就知道第一个行总个数是9个,第二行总个是是7+7-9 = 5个。 具体算子如下: 最后要做一步 Forward Scale 操作。 前面我们做了AllReduce之后,得到 row_offset_allreduce_tensors_ 是 0,9,14,19,21。这样就知道第一个行总个数是9个,第二行总个是是7+7-9 = 5个。就可以对embedding_data_.get_output_tensors(is_train)的每个元素进行操作,每个元素都除以本slot的元素总数,就是做mean了。 算子如下: 最终结果如下,图有几个被简化的地方,比如hash_table_value_tensors_ 应该是向量的向量,这里简化为向量。 embedding vector数值也是虚拟的。嵌入层的最终输出是在 EmbeddingData 的成员变量 train_output_tensors_ 之上。 或者从下面来看。 ★★★★★★关于生活和技术的思考★★★★★★ 微信公众账号:罗西的思考 如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。 快过HugeCTR:用OneFlow轻松实现大型推荐系统引擎 https://nvlabs.github.io/cub/annotated.html https://developer.nvidia.com/blog/introducing-merlin-hugectr-training-framework-dedicated-to-recommender-systems/ https://developer.nvidia.com/blog/announcing-nvidia-merlin-application-framework-for-deep-recommender-systems/ https://developer.nvidia.com/blog/accelerating-recommender-systems-training-with-nvidia-merlin-open-beta/ HugeCTR源码阅读 embedding层如何反向传播 https://web.eecs.umich.edu/~justincj/teaching/eecs442/notes/linear-backprop.html 稀疏矩阵存储格式总结+存储效率对比:COO,CSR,DIA,ELL,HYB 无中生有:论推荐算法中的Embedding思想 tf.nn.embedding_lookup函数原理 求通俗讲解下tensorflow的embedding_lookup接口的意思? 【技术干货】聊聊在大厂推荐场景中embedding都是怎么做的 ctr预估算法对于序列特征embedding可否做拼接,输入MLP?与pooling 推荐系统中的深度匹配模型 土法炮制:Embedding 层是如何实现的? 不等距双杆模型_搜索中的深度匹配模型(下) 深度特征 快牛策略关于高低层特征融合 [深度学习] DeepFM 介绍与Pytorch代码解释 deepFM in pytorch 推荐算法之7——DeepFM模型 DeepFM 参数理解(二) 推荐系统遇上深度学习(三)–DeepFM模型理论和实践 [深度学习] DeepFM 介绍与Pytorch代码解释 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/operations.html 带你认识大模型训练关键算法:分布式训练Allreduce算法
void forward(bool is_train, int eval_batch = -1) override {
// Read data from input_buffers_ -> look up -> write to output_tensors
#pragma omp parallel num_threads(embedding_data_.get_resource_manager().get_local_gpu_count())
{ // 本地多个GPU并行前向传播
// 每个线程对应一个GPU,多GPU进行
size_t i = omp_get_thread_num(); // 拿到本线程序号
CudaDeviceContext context(embedding_data_.get_local_gpu(i).get_device_id());
if (embedding_data_.embedding_params_.is_data_parallel) {
// 这里完成了对 EmbeddingData 的配置,这里i就是GPU index
filter_keys_per_gpu(is_train, i, embedding_data_.get_local_gpu(i).get_global_id(),
embedding_data_.get_resource_manager().get_global_gpu_count());
}
// 从本gpu的hashmap做lookup操作
// 这里 hash_tables_[i],hash_table_value_tensors_[i],hash_value_index_tensors_[i] 就是对应的hashmap
functors_.forward_per_gpu(embedding_data_.embedding_params_.get_batch_size(is_train),
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size, 0, is_train,
embedding_data_.get_row_offsets_tensors(is_train)[i],
embedding_data_.get_value_tensors(is_train)[i],
*embedding_data_.get_nnz_array(is_train)[i], *hash_tables_[i],
hash_table_value_tensors_[i], hash_value_index_tensors_[i],
embedding_feature_tensors_[i],
embedding_data_.get_local_gpu(i).get_stream());
}
// do reduce scatter
// 做了之后,数据才是完整的,每个gpu上分到完整数据的一部分
size_t recv_count = embedding_data_.get_batch_size_per_gpu(is_train) *
embedding_data_.embedding_params_.slot_num *
embedding_data_.embedding_params_.embedding_vec_size;
functors_.reduce_scatter(recv_count, embedding_feature_tensors_,
embedding_data_.get_output_tensors(is_train),
embedding_data_.get_resource_manager());
// scale for combiner=mean after reduction
if (embedding_data_.embedding_params_.combiner == 1) {
size_t send_count = embedding_data_.embedding_params_.get_batch_size(is_train) *
embedding_data_.embedding_params_.slot_num +
1;
functors_.all_reduce(send_count, embedding_data_.get_row_offsets_tensors(is_train),
row_offset_allreduce_tensors_, embedding_data_.get_resource_manager());
// do average
functors_.forward_scale(
embedding_data_.embedding_params_.get_batch_size(is_train),
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size, row_offset_allreduce_tensors_,
embedding_data_.get_output_tensors(is_train), embedding_data_.get_resource_manager());
}
return;
}
template
* int num_items; // e.g., 7
* int *d_in; // e.g., [8, 6, 7, 5, 3, 0, 9]
* int *d_out; // e.g., [ , , , , , , ]
* ...
*
* // Determine temporary device storage requirements for inclusive prefix sum
* void *d_temp_storage = NULL;
* size_t temp_storage_bytes = 0;
* cub::DeviceScan::InclusiveSum(d_temp_storage, temp_storage_bytes, d_in, d_out, num_items);
*
* // Allocate temporary storage for inclusive prefix sum
* cudaMalloc(&d_temp_storage, temp_storage_bytes);
*
* // Run inclusive prefix sum
* cub::DeviceScan::InclusiveSum(d_temp_storage, temp_storage_bytes, d_in, d_out, num_items);
*
* // d_out <-- [8, 14, 21, 26, 29, 29, 38]
template <
typename InputIteratorT,
typename OutputIteratorT>
CUB_RUNTIME_FUNCTION
static cudaError_t InclusiveSum(
void* d_temp_storage, ///< [in] %Device-accessible allocation of temporary storage. When NULL, the required allocation size is written to p temp_storage_bytes and no work is done.
size_t& temp_storage_bytes, ///< [in,out] Reference to size in bytes of p d_temp_storage allocation
InputIteratorT d_in, ///< [in] Pointer to the input sequence of data items
OutputIteratorT d_out, ///< [out] Pointer to the output sequence of data items
int num_items, ///< [in] Total number of input items (i.e., the length of p d_in)
cudaStream_t stream = 0, ///< [in] [optional] CUDA stream to launch kernels within. Default is stream0.
bool debug_synchronous = false) ///< [in] [optional] Whether or not to synchronize the stream after every kernel launch to check for errors. May cause significant slowdown. Default is p false.
{
// Signed integer type for global offsets
typedef int OffsetT;
return DispatchScan
* // Functor type for selecting values less than some criteria
* struct LessThan
* {
* int compare;
*
* CUB_RUNTIME_FUNCTION __forceinline__
* LessThan(int compare) : compare(compare) {}
*
* CUB_RUNTIME_FUNCTION __forceinline__
* bool operator()(const int &a) const {
* return (a < compare);
* }
* };
*
* // Declare, allocate, and initialize device-accessible pointers for input and output
* int num_items; // e.g., 8
* int *d_in; // e.g., [0, 2, 3, 9, 5, 2, 81, 8]
* int *d_out; // e.g., [ , , , , , , , ]
* int *d_num_selected_out; // e.g., [ ]
* LessThan select_op(7);
* ...
*
* // Determine temporary device storage requirements
* void *d_temp_storage = NULL;
* size_t temp_storage_bytes = 0;
* cub::DeviceSelect::If(d_temp_storage, temp_storage_bytes, d_in, d_out, d_num_selected_out, num_items, select_op);
*
* // Allocate temporary storage
* cudaMalloc(&d_temp_storage, temp_storage_bytes);
*
* // Run selection
* cub::DeviceSelect::If(d_temp_storage, temp_storage_bytes, d_in, d_out, d_num_selected_out, num_items, select_op);
*
* // d_out <-- [0, 2, 3, 5, 2, 8, 81, 9]
* // d_num_selected_out <-- [5]
template <
typename InputIteratorT,
typename OutputIteratorT,
typename NumSelectedIteratorT,
typename SelectOp>
CUB_RUNTIME_FUNCTION __forceinline__
static cudaError_t If(
void* d_temp_storage, ///< [in] %Device-accessible allocation of temporary storage. When NULL, the required allocation size is written to p temp_storage_bytes and no work is done.
size_t &temp_storage_bytes, ///< [in,out] Reference to size in bytes of p d_temp_storage allocation
InputIteratorT d_in, ///< [in] Pointer to the input sequence of data items
OutputIteratorT d_out, ///< [out] Pointer to the output sequence of partitioned data items
NumSelectedIteratorT d_num_selected_out, ///< [out] Pointer to the output total number of items selected (i.e., the offset of the unselected partition)
int num_items, ///< [in] Total number of items to select from
SelectOp select_op, ///< [in] Unary selection operator
cudaStream_t stream = 0, ///< [in] [optional] CUDA stream to launch kernels within. Default is stream0.
bool debug_synchronous = false) ///< [in] [optional] Whether or not to synchronize the stream after every kernel launch to check for errors. May cause significant slowdown. Default is p false.
{
typedef int OffsetT; // Signed integer type for global offsets
typedef NullType* FlagIterator; // FlagT iterator type (not used)
typedef NullType EqualityOp; // Equality operator (not used)
return DispatchSelectIf
template
template
template
3.2 配置数据
template
functors_.forward_per_gpu(embedding_data_.embedding_params_.get_batch_size(is_train),
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size, 0, is_train,
embedding_data_.get_row_offsets_tensors(is_train)[i],
embedding_data_.get_value_tensors(is_train)[i],
*embedding_data_.get_nnz_array(is_train)[i], *hash_tables_[i],
hash_table_value_tensors_[i], hash_value_index_tensors_[i],
embedding_feature_tensors_[i],
embedding_data_.get_local_gpu(i).get_stream());
}
4.1 提取数据
Tensors2
Tensors2
@param row_offset row_offset (CSR format of input sparse tensors)
@param hash_key value (CSR format of input sparse tensors)
@param nnz non-zero feature number per batch
@param hash_table hash table, pairs of
template
m_hf(10)=1
m_hf(20)=2
m_hf(30)=3
m_hf(40)=4
m_hf(50)=5
hashtbl_values[1] = 1,hashtbl_values[2] = 2, hashtbl_values[3] = 3,...
template
template
// __forceinline__ 的意思是编译为内联函数
// __host__ __device__ 表示是此函数同时为主机和设备编译
template
// do sum reduction
if (combiner == 0) { // 0-sum; 1-mean 这里是combiner类型
// 然后利用 hash_value_index 从 hash_table_value 之中得到 value,再进行操作
forward_sum(batch_size, slot_num, embedding_vec_size, row_offset.get_ptr(),
hash_value_index.get_ptr(), hash_table_value.get_ptr(),
embedding_feature.get_ptr(), stream);
} else if (combiner == 1) {
// 然后利用 hash_value_index 从 hash_table_value 之中得到 value,再进行操作
forward_mean(batch_size, slot_num, embedding_vec_size, row_offset.get_ptr(),
hash_value_index.get_ptr(), hash_table_value.get_ptr(),
embedding_feature.get_ptr(), stream);
}
// do sum reduction
template
* 40,50,10,20
* 30,50,10
* 30,20
* 10
* Will be convert to the form of:
* row offset: 0,4,7,9,10
* value: 40,50,10,20,30,50,10,30,20,10
40,50,10,20 # slot 1
30,50,10 # slot 2
30,20 # slot 1
10 # slot 2
m_hf(40)=4
// forward kernel funcion: for both combiner=sum and combiner=mean
template
4.3.3.4 并行操作
40,50,10,20 # slot 1
30,50,10 # slot 2
embedding_feature[feature_row_index * embedding_vec_size + tid] =
TypeConvertFunc
sum += (value_index != std::numeric_limits
model.add(hugectr.Input(label_dim = 1, label_name = label,
dense_dim = 13, dense_name = dense,
data_reader_sparse_param_array =
[hugectr.DataReaderSparseParam(wide_data, 30, True, 1),
hugectr.DataReaderSparseParam(deep_data, 2, False, 26)]))
model.add(hugectr.SparseEmbedding(embedding_type = hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash,
workspace_size_per_gpu_in_mb = 23,
embedding_vec_size = 1,
combiner = sum,
sparse_embedding_name = sparse_embedding2,
bottom_name = wide_data,
optimizer = optimizer))
model.add(hugectr.SparseEmbedding(embedding_type = hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash,
workspace_size_per_gpu_in_mb = 358,
embedding_vec_size = 16,
combiner = sum,
sparse_embedding_name = sparse_embedding1,
bottom_name = deep_data,
optimizer = optimizer))
"------------------------------------------------------------------------------------------------------------------n",
"Layer Type Input Name Output Name Output Shape n",
"------------------------------------------------------------------------------------------------------------------n",
"DistributedSlotSparseEmbeddingHash wide_data sparse_embedding2 (None, 1, 1) n",
"DistributedSlotSparseEmbeddingHash deep_data sparse_embedding1 (None, 26, 16)
0x05 Reduce Scatter
// do reduce scatter
// 做了之后,数据才是完整的,每个gpu上分到完整数据的一部分
size_t recv_count = embedding_data_.get_batch_size_per_gpu(is_train) *
embedding_data_.embedding_params_.slot_num *
embedding_data_.embedding_params_.embedding_vec_size;
functors_.reduce_scatter(recv_count, embedding_feature_tensors_,
embedding_data_.get_output_tensors(is_train),
embedding_data_.get_resource_manager());
template void SparseEmbeddingFunctors::reduce_scatter
* 1) forward
* sum: calling forward_sum_kernel()
* mean: calling foward_sum_kernel() + forward_scale_kernel()
// scale for combiner=mean after reduction
if (embedding_data_.embedding_params_.combiner == 1) {
size_t send_count = embedding_data_.embedding_params_.get_batch_size(is_train) *
embedding_data_.embedding_params_.slot_num +
1;
functors_.all_reduce(send_count, embedding_data_.get_row_offsets_tensors(is_train),
row_offset_allreduce_tensors_, embedding_data_.get_resource_manager());
// do average
functors_.forward_scale(
embedding_data_.embedding_params_.get_batch_size(is_train),
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size, row_offset_allreduce_tensors_,
embedding_data_.get_output_tensors(is_train), embedding_data_.get_resource_manager());
}
6.1 AllReduce
* 40,50,10,20
* 30,50,10
* 30,20
* 10
* Will be convert to the form of:
* row offset: 0,4,7,9,10
* value: 40,50,10,20,30,50,10,30,20,10
* 40,50,10,20,30
* 30,50
* 30,20,40
* 10
* Will be convert to the form of:
* row offset: 0,5,7,10,11
* value: 40,50,10,20,30,50,10,30,20,10
template
6.2 Forward Scale
// do average
functors_.forward_scale(
embedding_data_.embedding_params_.get_batch_size(is_train),
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size, row_offset_allreduce_tensors_,
embedding_data_.get_output_tensors(is_train), embedding_data_.get_resource_manager());
// forward kernel function: this is an additional function for combiner=mean (only for Distributed
// Embedding)
template
0x07 总结


![[源码解析] NVIDIA HugeCTR,GPU 版本参数服务器---(7) ---Distributed Hash之前向传播 [源码解析] NVIDIA HugeCTR,GPU 版本参数服务器---(7) ---Distributed Hash之前向传播](http://www.mshxw.com/aiimages/31/747597.png)
