交互的流程图
说明
- 以提交到yarn集群为例:部署方式为 client
1- 由执行的spark-submit脚本提交任务,会在当前这个节点根据提交的信息启动一个Driver程序, 由这个Driver程序向yarn的主节点提交任务操作,yarn会认为这个任务启动一个applicationmaster程序,后续与任务相关的操作都找这个applicationmaster程序即可(任意节点的都可以启动和Driver不在一个节点)
2- applicationmaster会根据executor资源信息行yarn的主节点申请源,用于启动executor
3- 当applicationmaster拿到用于启动executor的资源后,通知相应的从节点启动executor执行器即可,当对应节点的执行器启动后,反向响应给Driver程序
4- Driver程序就会开始进行任务的执行流程图生成,此时会产生一个DAG的有向无环图,执行各个stage节点,以及划分区数的操作
4.1:先根据py4j读取spark程序中的代码,创建 sparkcontext对象
4.2:检查后续一共使用到哪些算子,再根据这些算子划分stage阶段(每个节点划分为几个区,对应有几个线程执行),并生成DAG执行流程图
4.3:通知executor进行执行对应的任务操作即可
4.4:监控各个executor执行进度,等待执行完成,关闭sparkcontext对象,释放资源,通知applicationmaster已经执行完成
5- executor接收到Driver程序分配任务后, 开始运行执行任务即可, 如果任务的结果需要返回给Driver, 此时将结果数据返回即可, 如果不需要, 直接输出操作, 那么Task程序就直接将结果输出即可
6- AppMaster收到Driver程序处理完成的信息后, 通知yarn的主节点, 任务执行完成, 回收资源并关闭整个任务
- 以提交到yarn集群为例:部署方式为 cluster
跟部署方式为 client相对比 第1- 步不同:由执行的spark-submit脚本提交的任务给 resourcemanager(yarn的主节点),yarn会为这个任务启动一个applicationMaster程序,后续与任务相关的操作都找这个applicationMaster即可 (任意某个从节点) 第2- 步不同:appMaster会根据任务的信息分别启动Driver程序(让自己同时升级为Driver程序),然后根据executor资源信息向yarn主节点申请资源, 用于启动executor
- 以提交到spark集群为例 部署方式为 client
1- 由执行spark-submit脚本提交任务, 会在当前这个节点根据提交信息启动一个Driver程序, Driver程序会根据资源信息,向Master申请资源, 用于启动executor
2- 当Driver拿到用于启动executor的资源后, 通知相对应的从节点启动executor执行器即可, 当对应节点执行器启动后, 反向响应给Driver程序, 已经启动好了
3- Driver程序就会开始进行任务的执行流程图生成. 此时就会产生DAG执行流程图, 划分各个执行stage节点, 以及划分分区数等操作:
3.1:首先根据py4j 读取spark程序中代码,创建sparkContext对象
3.2:检查后续一共使用到那些算子, 根据这些算子划分stage阶段(每个节点划分为几个分区, 对应有几个线程执行), 并生成DAG执行流程图
3.3:通知各个executor进行执行对应任务操作即可
3.4:监控各个executor执行进度, 等待执行完成, 关闭sparkContext对象, 释放资源, 通知Master , 任务已经执行完成
4- executor接收到Driver程序分配任务后, 开始运行执行任务即可, 如果任务的结果需要返回给Driver, 此时将结果数据返回即可, 如果不需要, 直接输出操作, 那么Task程序就直接将结果输出即可
5- master收到任务处理完成的信息后, 回收资源即可
- 以提交到spark集群为例 部署方式为 cluster
与部署方式为 client对比 第1- 步不同:由执行spark-submit脚本提交任务到 spark集群的Master节点, Master会根据提交信息, 在某一个worker节点上, 启动一个Driver程序 。。。。。。。。。。 剩下的步骤相同spark-submit想关的参数
spark-submit.sh脚本的作用
-
用于将spark程序提交到指定的资源调度平台上进行运行,并且在提交过程中,可以对资源设置相关的配置信息
-
基本参数
--master :用于指定提交到那个资源调度平台 (可选择: local| spark | yarn ....) -- deploy-mode: 用于指定提交部署方式(可选择: client 和 cluster) --conf : 用于设置相关配置信息 python-file : 指定spark的python脚本 args: 添加程序入口参数, 如果没有参数是可以不配置的spark-core的内容(核心部分)
RDD的基本介绍
- MR的计算过程
- RDD的计算过程
- 背景说明
1)在早期的计算模型: 单机模型 比如: pandas , mysql 依赖于单个节点的性能 适用于: 少量数据集统计分析的处理 在计算过程中,数据都是在一个进程中的,不断地进行迭代计算操作 2)当数据量大了以后, 单机的这种计算的模式就无法支撑了,此时需要分布式的计算的模型 核心:让多个节点参与计算, 将计算任务进行划分, 将这个部分交给各个节点进行运行, 运算后, 将结果进行汇总 比如:MapReduce, spark MapReduce计算的模型: 在计算过程中, 每一个MR都是有两部分组成: map 和 reduce,在计算过程中, 需要将数据从磁盘读取内存中, 从内存落入磁盘, 再从磁盘读取到内存中, 这样导致整个IO变大, 不断的与磁盘进行交互, 整个执行效率 也是比较低的 由于一个MR只有map和reduce节点, map进行分布式计算, 计算reduce汇总统计, 如果需要进行多次的分布式计算和多次聚合统计(迭代计算), 对于MR来说, 必须使用多个MR进行串行执行了,而这样的执行导致每一个MR都需要重新申请资源, 回收资源, 大量的时候都消耗在了资源申请和回收上, 而且这种操作中间结果只能保存在磁盘中, 导致效率比较低 正因为有了这个MR问题后, 此时想办法解决问题,解决问题思路: 1) 是否可以让中间结果都保存在内存中, 这样效率是不是就比较高了 2) 是否可以在一个程序中完成多次的不断迭代计算操作
什么是RDD
RDD:弹性的分布式数据集 RDD的目的:主要用于支持更加高效的迭代计算
- RDD的五大特性
注:1,2,3是必须有的 4,5可选 1- 可以分区的:每一个分区对应了一个task线程 2- 计算函数: 对每一个分区进行计算操作 3- 存在依赖关系 4- 对于k-v数据存在分区计算函数 5- 移动数据不如移动计算(将计算程序运行在离数据越近越好)
- RDD的五个特点
1- 可分区的:分区是抽象定的分区,仅仅是定义分区的信息规则
2- 只读的特性:一个RDD对象中数据是不可变的
3- 依赖: RDD与RDD之间是存在依赖关系的:依赖的关系越长,整个血缘关系越长,血缘关系越长重新计算的代价越大
依赖关系可以分为:宽依赖的窄依赖
4- 缓存:当需要对一个RDD的结果进行重复使用的时候, 可以将这个RDD的计算结果缓存起来, 减少后续重新计算的资源和时间消耗
5- checkpoint: 检测点
当依赖链条比较长的时候, 如果其中一个算子计算失败, 重新计算一次的代价是很大的, 需要重新将整个依赖关系全部重塑, 非常耗费资源, 可以同检测点将血缘关系打断, 在对应打断点上记录当前结果数据, 这样后续即使使用了, 也不需要重塑全部依赖流程 提升容错能留
如何获取RDD对象
- 构建RDD对象的方式主要有两种:
- 第一种通过 parallelize()来构建
from pyspark import SparkContext,SparkConf
import os
# 目的: 锁定远端操作环境, 避免存在多个版本环境的问题
os.environ["SPARK_HOME"]="/export/server/spark"
os.environ["PYSPARK_PYTHON"]="/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/root/anaconda3/bin/python"
if __name__ == '__main__':
print("构建RDD对象: 方式一演示")
# 1) 构建sparkContext对象
# local[*]: * 表示当前环境中的cpu是几核的, * 就表示运行多少个线程
conf = SparkConf().setMaster("local[*]").setAppName("init0")
sc = SparkContext(conf=conf)
#2) 通过 paralleleize 获取RDD对象
# 通过初始化数据集方式来构建一个RDD对象
rdd = sc.parallelize(["张三", "李四", "王五", "赵六", "田七"],6)
#rdd = sc.parallelize(range(10))
rdd2 = rdd.map(lambda name: (name, 1))
#3) 打印这个RDD对象中数据
# getNumPartitions() 是用于获取当前这个rdd有多少个分区数
# glom() : 获取每一个分区的数据
print(rdd2.getNumPartitions())
print(rdd2.glom().collect())
- 第二种通过加载外部数据源的方式构建
from pyspark import SparkContext, SparkConf
import os
# 目的: 锁定远端操作环境, 避免存在多个版本环境的问题
os.environ["SPARK_HOME"] = "/export/server/spark"
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"
if __name__ == '__main__':
print("演示构建RDD的第二种方式: 加载外部数据集")
# 1) 创建 sparkContext对象
conf = SparkConf().setMaster("local[*]").setAppName("init1")
sc = SparkContext(conf=conf)
#2) 构建RDD对象: 加载外部数据集
# 请问 : 这里的写的路径对不对? 1 2
# 注意: file:/// 本地路径 指的是 linux的本地路径 不是windows本地路径
# 注意:
# 在加载外部数据集, 如果数据是本地路径,文件有多少个, 自动划分多少个分区数, 如果加载HDFS, 有多少个block 对应就有多少个分区
# 注意: 如果文件的分区数量小于了 local[*]的数量, 依然会以local[*]数量为标准
rdd = sc.textFile("file:///export/data/workspace/_02_pyspark_core/data/")
# 3) 打印 rdd数据
print(rdd.getNumPartitions())
print(rdd.glom().collect())
- 说明
1) 默认情况下, 分区数量取决于 Master参数设置, 以及linux服务器的cpu的数量设置 2) 支持手动设置数据的分区数量: parallelize(初始数据集, 分区数量) 3) 如何获取分区数量:getNumPartitions() 4) 如果获取每一个分区下的数据: glom().collect()
- 如何处理大量小文件,避免开启太多的分区
在用对象获取文件的时候用wholeTextFiles这个算子,可以避免多个文件都进行分区,避免造成资源的浪费
rdd = sc.wholeTextFiles("file:///export/data/workspace/_02_pyspark_core/data")
RDD算子操作
RDD算子的分类
1)一类是转换算子(transformastion) 1- 会返回一个新的RDD对象 2- 所有的转换算子函数都是lazy(惰性),不会立即执行,只有遇到了动作算子函数触发 3- 不负责数据的存储,仅仅是为了定义计算的规则 2)一类是动作算子(action) 1- 不会返回RDD,无返回值(saveAsTextFile),或直接返回计算结果(collect) 2- 会立即执行生成一个DAG的有向无环图执行任务,一个spark程序中有多少个动作算子,也就代表着有多少个任务 如:count, first, take,collect
算子的介绍
- 转换算子
1)值类型转换算子:数据类型只有value 或者说算子只对value对处理
#map算子
rdd = sc.parallelize(range(10))
需求: 请将 0-9的列表数据, 每一个数据都新增+1操作
rdd = sc.parallelize(range(10)) # 初始化了 0~9的列表数据集
rdd2 = rdd.map(lambda num: num+1)
rdd2.collect()
结果为:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
#----------------------------------------------
#groupby算子
rdd = sc.parallelize(range(10))
需求: 请将 0-9的列表数据,将其中 偶数分为一组 奇数分为一组
rdd = sc.parallelize(range(10)) # 初始化了 0~9的列表数据集
rdd2= rdd.groupBy(lambda num: 'o' if(num%2 == 0) else 'j' )
rdd2.collect()
[('o', ), ('j', )]
>>> rdd2.mapValues(list).collect()
[('o', [0, 2, 4, 6, 8]), ('j', [1, 3, 5, 7, 9])]
#--------------------------------------------------
#mapvalues算子
对key value中value数据执行map操作, 对其一对一转换操作
#--------------------------------------------------
#filter算子
rdd = sc.parallelize(range(10))
需求: 请将 0-9的列表数据, 请将 >5的数据过滤掉
rdd = sc.parallelize(range(10)) # 初始化了 0~9的列表数据集
rdd2 = rdd.filter(lambda num: num > 5)
rdd2.collect()
结果:
[6, 7, 8, 9]
#---------------------------------------------------
#flatmap算子
需求: 将 姓名信息 进行切分, 然后将其所有姓名放置在一个列表中, 一个姓名就是一个列表元素
希望结果为: ["张三,老张,老王,老李,李四,周八,李九,王五,赵六,田七"]
rdd = sc.parallelize(["张三 老张 老王 老李","李四 周八 李九","王五 赵六 田七"])
rdd2 = rdd.flatMap(lambda names: names.split(" "))
rdd2.collect()
结果为:
['张三', '老张', '老王', '老李', '李四', '周八', '李九', '王五', '赵六', '田七']
2)双值类型的转换算子
#union 计算并集(不会去重) #intersection计算交集 rdd1 = sc.parallelize([1,2,3,4,5]) rdd2 = sc.parallelize([6,7,3,4,9]) 分别计算其交集和并集: rdd3 = rdd1.union(rdd2) rdd4 = rdd1.intersection(rdd2) rdd3.collect() 结果: [1, 2, 3, 4, 5, 6, 7, 3, 4, 9] rdd4.collect() 结果 [3, 4]
3)key-values类型的转换算子
#groupBykey 将相同的key进行分组操作,将相同的key的values进行合并形成一个列表
rdd = sc.parallelize([('c01','张三'), ('c02','老张'), ('c01','老王'), ('c03','老李'), ('c02','李四'), ('c02','周八'), ('c04','李九'), ('c03','王五'), ('c02','赵六'), ('c03','田七')])
需求; 根据key分组, 求出每个班级人员
rdd2 = rdd.groupByKey()
rdd2.mapValues(list).collect()
结果:
[('c01', ['张三', '老王']), ('c02', ['老张', '李四', '周八', '赵六']), ('c03', ['老李', '王五', '田七']), ('c04', ['李九'])]
#---------------------------------------------------
#reduceBykey 根据key进行分组,求每个班级有多少个人
rdd = sc.parallelize([('c01','张三'), ('c02','老张'), ('c01','老王'), ('c03','老李'), ('c02','李四'), ('c02','周八'), ('c04','李九'), ('c03','王五'), ('c02','赵六'), ('c03','田七')])
需求; 根据key分组, 求出每个班级有多少个人
rdd2 = rdd.map(lambda x:(x[0],1)).reduceByKey(lambda agg,curr : agg+curr)
rdd2.collect()
结果:
[('c01', 2), ('c02', 4), ('c03', 3), ('c04', 1)]
#-----------------------------------------------
#sortByKey 根据key进行排序操作,默认是升序排序
#需求: 假设有一下数据, 请按照key进行倒序排序
rdd = sc.parallelize([(2,'c01'), (4,'c02'), (3,'c03'), (1,'c04')])
rdd2 = rdd.sortByKey(ansending=False).map(lambda num: (num[1],num[0]))
print(rdd2.collect())
#----------------------------------------------------
#countByValues 根据values进行统计操作
rdd = sc.parallelize([1,2,3,1,2,2,1,4,5,2,3])
print(rdd.countByValue())
defaultdict(, {1: 3, 2: 4, 3: 2, 4: 1, 5: 1})



