栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark基础理论及优化思路(二)

spark基础理论及优化思路(二)

文章目录

Spark工作流程Spark三种提交模式Spark on yarn 的调度RDD 阶段划分RDD任务划分

RDD任务切分中间为:Application、Job、Stage、Task RDD持久化

RDD Cache 缓存RDD CheckPoint检查点缓存与检查点区别 什么是RDDreduceBykey与groupByKey的区别

从shuffle的角度功能角度 RDD的五大特点如何使用Spark实现TopN的获取?Spark shuffer原理Spark内存溢出问题

内存溢出的场景:解决方案: SparkStreaming从Kafka里面如何消费数据?简述SparkStreaming窗口函数的原理?SparkStreaming默认分区数?Spark性能调优

常规性能调优

常规性能调优一:最优资源配置常规性能调优二:RDD优化常规性能调优三:并行度调节常规性能调优四:广播大变量 Spark算子调优Shuffle调优 Spark数据倾斜SparkSQLSparkStreaming精准一次消费?

Spark工作流程

构建Spark application运行环境(启动SparkContext),SparkContext向资源管理器注册申请运行Executor资源资源管理器分配Executor并启动,Executor运行情况将随着心跳发送到资源管理器上SparkContext构建成有向无环图,将向无环图分解成Stage,并把Taskset发送给任务调度器。Executor向SparkContext申请任务,任务调度器将任务发放给Executor运行,同时SparkContext将应⽤用程序代码发放给Executor。Task在Executor上运行,运行行完毕释放所有资源。 Spark三种提交模式

Spark内核架构 standlone

yarn-cluster

.yarn-client

Driver端启动SparkSubmit进程,启动后开始向Master进行通信,此时创建了一个对象(SparkContext),接着向Master发送任务消息Master接收到任务信息后,开始资源调度,此时会和所有的Worker进行通信,找到空闲的Worker,并通知Worker来拿取任务和启动相应的ExecutorExecutor启动后,开始与Driver进行反向注册,接下来Driver开始把任务发送给相应的Executor,Executor开始计算任务 Spark on yarn 的调度

spark任务提交后会与ResourceManager通信申请启动Application masterResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是DriverApplicationMaster启动后向ResouceManager申请Exector内存,ResourceManager接到ApplicationMaster的资源申请后会分配container然后在合适的NodeMananger上启动Eexcutor进程Eexcutor进程启动后会向Driver反向注册,Executor全部注册完成后,Driver开始执行主函数主函数执行到Action算子时,会触发一个job,并根据宽依赖开始划分Stage,每个Stage生成对应的Task,之后将task分发到各个Executor上执行 RDD 阶段划分

一个shuffer依赖会创建一个shuffer阶段阶段的总数量= shuffler依赖(宽依赖)数量 + 1 (ResultStage)ResultStage是整个流程的最后一个阶段 RDD任务划分 RDD任务切分中间为:Application、Job、Stage、Task

Application:初始化一个SparkContext即生成一个ApplicationJob:一个Action算子就会生成一个JobStage:Stage等于宽依赖(ShufferDenpendency)的个数加1Task: 一个Stage阶段中,最后一个RDD的分区个数就是Task的个数 RDD持久化 RDD Cache 缓存

Cache与persist

RDD的Cache方法其实调用的就是persist(缓存策略默认为缓存在内存中)缓存的位置是jvm堆内存 RDD CheckPoint检查点

将RDD的中间结果写入磁盘缓存与检查点区别

Cache缓存只是将数据保存起来,不切断RDD血缘关系,CheckPoint检查点切断血缘关系Cache缓存的数据通常存储在内存,磁盘等地方,可靠性低,CheckPoint的数据通常存储在hdfs等容错。高可用的文件系统 什么是RDD

弹性分布式数据集,是Spark中最基本的数据处理模型。

弹性

存储弹性:内存与磁盘自动切换容错的弹性:数据丢失可以自动恢复 分布式:数据存储在大数据集群不同节点上 reduceBykey与groupByKey的区别 从shuffle的角度

		都存在shuffle操作,reducebykey在map端有预聚合,groupbykey只是分组不存在数据量减少的问题,
		reducebykey性能比较高
功能角度
		 需求是分组聚合用reducebykey,仅仅分组使用groupbykey。

groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。reduceBykey在Mapper端对每个分区的key预先进行一次合并,类似于mapreduce当中的combiner归约,之后reducer端再把合并后的数据拉取过来,这样做的好处就是减少了mapper端到reducer端的数据量传输,提高了IO性能,也就提高了效率 RDD的五大特点

RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建。RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性) 如何使用Spark实现TopN的获取?

方法1: 按照key对数据进行聚合,将value转换为数组,利用scala的sortby或者sortwith进行排序
(mapvalues)数据量太大,会OOM。方法2:自定义分区器,按照key进行分区,使不同的key进入到不同的分区,对每个分区运用spark排序算子 进行排序 Spark shuffer原理

SparkDAG在调度阶段将job划分为多个stage,

上游stage做map操作,每个maptask将计算结果分成多份,每份对应到下游stage的每个partition中,并将临时结果写到磁盘,这个过程叫做shufflerwrite,除了逻辑计算外,还会产生序列化,磁盘io下游stage做reduce操作,每个reduce task通过网络拉取指定分区结果数据,这个过程叫shuffer read,除了逻辑计算外,还会产生反序列化和磁盘io Spark内存溢出问题 内存溢出的场景:

map过程产生大量对象导致内存溢出数据不平衡导致内存溢出coalesce调用导致内存溢出shuffle后内存溢出standlone模式下资源分配不均匀导致内存溢出在RDD中,共用对象能够减少OOM的情况 解决方案:

使用mapPartitions代替大部分map操作,或者连续使用map的操作broadcast join 和 普通的join先filter再joinpartitionby优化combinebykey的使用参数优化 SparkStreaming从Kafka里面如何消费数据?

基于Receiver的方式基于Direct的方式
区别:

基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。 简述SparkStreaming窗口函数的原理?

窗口函数就是在原来定义的SaprkStreaming计算批次大小的基础上再次进行封装,每次计算多个批次的数据,同时还需要传递一个滑动步长的参数,用来设置当次计算任务完成之后下一次从什么地方开始计算。 SparkStreaming默认分区数?

SparkStreaming默认分区个数与所对接的kafka topic分区数一致,SparkStreaming里一般不会使用 repartition算子增大分区,因为repartition会进行shuffle,增加耗时。 Spark性能调优 常规性能调优 常规性能调优一:最优资源配置

	 --num-executors 配置Executor 的数量
	 --driver-memory 配置Driver内存
	 --executor-cores 配置每个Executor的CPU core 数量
	 --exector-memory 配置每个Executor的内存大小

Spark-Yarn模式

根据Spark作业要提交到的资源队列,进行资源的分配

例如:资源队列有400G内存,100个CPU core 那么指定50个Executor,每个Executor分配8G内存,两个CPU core增加Executor个数:增加Exector的个数可以提高执行task的并行度4个executor 每个executor有2个core 可以并行执行8个task任务,如果增大到8个executor,并行执行的任务可以达到16个,并行能力提高一倍 增加ExecutorCPU核心的个数增加Executor的内存量:

RDD可以缓存更多的数据,写入磁盘的数据相应减少,减少磁盘io为shuffle操作提供更多内存,有更多空间来存放reduce端拉取的数据,减少磁盘io为task的执行提供更多内存,task运行时会创建很多对象,增加内存,避免频繁GC提升整体性能 常规性能调优二:RDD优化

RDD复用:避免相同的算子和计算逻辑之下对RDD进行重复的计算RDD持久化:cache或checkPoint

内存不足可以考虑进行序列化 RDD尽可能早的filter操作 常规性能调优三:并行度调节

调节各个Stage的task数量

例如20个EXecutor,每个Executor分配3个CPU core ,spark作业有40个task,导致每个executor有一个core的资源浪费Spark官方推荐,task数量应该设置为spark作业总cpu core 数量的两倍 常规性能调优四:广播大变量

将task中算子使用的外部变量,广播出去 Spark算子调优

算子调优一:MapPartition

与map区别是MapPartition对分区内数据进行单独计算,map是一个分区一个分区计算

算子调优二:filter与coalesce的配合使用

filter过滤后分区间的数据不均衡,可以使用coalesce重新进行分区处理repatition解决SparkSql低并行度问题SparkSQL执行完后立即进行repartition重新分区 sparksql无法直接调节分区

算子调优二:reduceBykey预聚合

reducebykey相比普通的shuffle操作在map端会进行本地的预聚合,减少reduce端网络传输的数据量,提升性能groupBykey 不会进行map端的聚合,而是所有map端shuffler到reduce端然后再reduce端进行聚合,而reducebykey有map端预聚合功能,效率要明显高于groupbykey Shuffle调优

调节map端缓冲区大小(在提交任务时在Sparkconf里面添加)

相当于提高reduce端每次能拉取数据的大小 调节reduce端缓冲区大小

相当于提高每次从map端拉取数据的大小 调节reduce端拉取数据重试次数 (相当于增加拉取失败重试的次数)调节reduce端拉取数据等待间隔 (相当于增加拉取失败重试的时间间隔) Spark数据倾斜

主要产生原因:
shullfer过程中由于不同的key对应的数据量不同,导致少数task被分配的数据量过大,运行缓慢。定位数据倾斜问题:

查阅代码中的shuffler算子,例如reducebykey、countBykey、groupBykey,join等算子,根据代码逻辑判断此处是否会出现数据倾斜查看Spark作业的日志文件,根据异常定位到代码位置,明确错误发生在第几个Stage,对应的shuffler算子是哪一个 解决方案:

避免shuffler过程

如果spark作业的数据来源是Hive表,可以先在hive表中对数据进行聚合,例如按照key进行分组,将相同key对应的所有value用一种特殊的格式拼接到字符串中,这样一个key就只有一条数据了;之后对一个key所有的value进行处理时,只需进行map处理就可以了 提高shuffler操作中的reduce并行度

reduce端并行度提高就增加了reduce端task的数量,那么每个task 分配到的数据量就会相应减少 SparkSQL

RDD、Datafream、Dataset之间的关系和转化

宏观角度:
RDD是弹性分布式数据集;
Datafream是在RDD之上多了一层schema
Dataset是在Datafream之上多了一个数据类型 微观角度:
优点:

编译时安全,编译时侯可以检查类型是否安全

面向对象的编程风格,可以通过直接点的方式对数据进行操作

缺点:

序列化和反序列化消耗资源太大,反序列化时,会将数据结构和数据内容都反序列化GC操作频繁,RDD要频繁的创建和销毁,务必会产生很多的GC操作Datafream 在RDD之上引入schema和off-heap,多个RDD的每一行的数据结构都一致,spark就可以通过schema来识别数据的结构,在反序列化时,可以只反序列化数据而结构就可以省略。Dataset 综合了RDD和Datafream的优点,并引入encodeing,数据在进行序列化时, encodeing出来的字节码和off-heap互通,这样就可以做到按需读取数据,而不需要将所有的数据反序列化。三者转换:
RDD–>df = toDF
RDD–>ds = toDS
df–>ds = AS[类型]
ds–>df = toDF
ds–>RDD = .rdd
dr–>RDD = .rdd SparkStreaming精准一次消费?

手动维护偏移量处理完业务数据后,在进行提交偏移量操作在极端情况下,如在提交偏移量时,断网或断电会造成spark程序第二次启动时重复消费问题,所以在涉及到金额或精确性非常高的场景会使用事务保证精准一次消费。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784172.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号