spark1.0有丰富的API,提供SQL查询、流式计算、机器学习、图计算,Spark可以跑在Yarn、 Mesos等多种调度平台, 也可以使用standalone模式或者上云。同时是基于内存的计算,比MR快100倍左右
RDD分为transformation和action算子,另一类特殊的算子就是cache 和 persist 算子
RDD依赖:
宽依赖:一对多
窄依赖:一对一或者多对一
workcount案例:
val words=sc.textFile("");
val lines=words.flapMap(lines=>lines.split(";"));
.map(word=>(word,1))
.reduceBykey(_+_)
.collect()
groupByKey:
val lines=words.flapMap(lines=>lines.split(";"));
.map(word=>(word,1))
.groupByKey()
.map(t=>(t._1,t_2.sum))
.collect()
经过reduceByKey就会触发shuffle的操作,即为宽依赖
同一个stage内的操作是可以串行
map算子:从一个RDD转换为一个新的RDD
flatMap算子:打平操作,把每一条数据都拆分开来
groupByKey:通过key进行分组,会进行reduce
reduceByKey和groupByKey区别:
groupByKey会产生shuffle,所以会走网络(11条记录会经过shuffle)
reduceByKey:从上到下可分为map side和reduce side两部分,中间会出发shuffle操作(spark中用exchange),在map阶段会根据相同的key进行聚合,会有6条记录走网络,经过了shuffle,节约了50%的shuffle数据量。推荐reduceByKey
collect算子:所有的记录都会放到内存里
shuffle之后进行checkpoint,可以减少读磁盘的次数,可以不需要重新做shuffle
spark1.0版本中,内存计算,使用内存代替磁盘,大大提高执行性能,同时提供丰富的API,可以支持多种部署方式,比如 YARN,MESOS,docker等,数据来源可以从Hadoop的hdfs上,Hbase,ES,MySQL,JSON或者parquet等。
RDD的分区:
一个RDD可以包含多个分区,每个分区就是一个dataset片段
RDD根据数据源分区可以分为以下三种:
Version:0.9 StartHTML:0000000105 EndHTML:0000001878 StartFragment:0000000141 EndFragment:0000001838
• HDFS files – number of partitions = number of HDFS blocks • Parquet – number of partitions = number of part files • Text files – partitioned based on block size defined by Hadoop configuration, typically means 64/128MB blocks Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。Partitioner函数不但决定了RDD本身的分区数量,也决定了parent RDD Shuffle输出时的分区数量。 可以根据RDD.getNumPartitions()——获取RDD分区数1、默认分区数
相关参数
sc.defaultParallelism
spark.default.parallelism配置
如果未配置spark.default.parallelism时,num-executors * executor-cores
sc.defaultMinPartitions = min(sc.defaultParallelism, 2)
HDFS文件分片数 = 文件大小 / HDFS block size(根据Hadoop版本,默认为128M或者256M)
2、RDD分片数
sc.parallelize读取scala集合生成RDD,未指定分区数,RDD分区数=sc.defaultParallism
sc.textFile读取本地文件生成RDD,未指定分区数,RDD分区数=max(本地文件分区数,sc.defaultMinPartitions)
sc.textFile读取HDFS文件生成RDD,未指定分区数,RDD分区数=max(HDFS文件分区数,sc.defaultMinPartitions)
3、重新分区
(1)RDD.coalesce(numPartitions, shuffle)
返回一个新的RDD,它被简化为numPartitions分区。属于窄依赖关系,例如,从1000个分区到100个分区,将不会有一个shuffle,而是100个新分区中的每一个都会使用10个当前分区。
当numPartitions = 1时,计算发生在1个节点上,导致并行度下降,无法充分利用分布式环境的优势。为了避免这种情况,可以传递shuffle = true。这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。
(2)RDD.repartition(numPartitions) = RDD.coalesce(numPartitions, shuffle=True)
返回一个新的RDD,该RDD恰好具有numPartitions分区。repartition这个方法可以增加或减少此RDD中的并行度,在内部,这使用shuffle来重新分配数据。
如果要减少RDD中的分区数量,可以使用“coalesce”,这样可以避免执行shuffle。
4、分区器
HashPartitioner
Spark默认的分区器
key求取hash值,再对hash值对分区数partitions 取余数,如果余数<0,那么就取“余数+partitions”,作为该row对应的分区编号,非负取模值,若为负数则mod+numPartitions转为正数
如果大部分key是相同的话将会导致,各partition之间存在数据倾斜的问题
RangePartitioner
将一定范围内的数映射到某一个分区内
RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。
reduceByKey为RDD生成RangePartitioner
实现方式主要是通过两个步骤来实现的,第一步:先重整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的
自定义分区器
继承 org.apache.spark.Partitioner
实现方法
numPartitions: Int: 总分区数
getPartition(key: Any): Int: 返回给定键对应的分区编号(0 <= 且 <= numPartitions-1)
equals(): Java 判断相等性的标准方法
hashCode() : 确保 getPartition() 永远返回一个非负数。
RDD容错:
会根据RDD的血缘关系和checkpoint机制来进行容错检查,如果某个RDD的算子失败,会根据血缘关系向前寻找,找到上一级,如果运算是窄依赖。则仅仅要把丢失的父RDD分区重算就可以,不依赖于其它节点,但是如果是宽依赖的话就会计算很多的父RDD,耗费时间和性能,所以会通过checkpoint机制,checkpoint的意思就是建立检查点,类似于快照,如果将中间的计算结果通过cache或者persist放到内存或者磁盘中,这种也不能完全保证数据的完整性和不丢失,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面)
可以通过sc.setCheckpointDir("")来指定checkpoint的路径
二、spark架构
(1)一个Spark程序会创建一个SparkContext
(2)将程序提交到集群管理器
(3)工作节点会根据资源需求量启动多个Executor进程。Executor进程会向调度器进行注册。
(4)Driver程序将任务分发到Executor中
一个application如果包含多个job,可能触发多个job的原因:
(1)join的时候可能会触发
(2)触发action算子的时候
Stage:
Stage(阶段)我们也可以称为“调度阶段”
Stage是在DAGScheduler中进行划分的,每一个宽依赖就会划分一个stage
分为两类Stage: ShuffleMapStage和ResultStage包含最后一个RDD的Stage为ResultStage。 然后根据getParentsStages()找出其依 赖的祖先RDD,根据其是否具有Shuffle进行划分。若存在shuffle,那么Shuffle之前 的Stage为ShuffleMapStage。



