- 为大规模数据处理而设计的快速通用的流式计算引擎
- MapReduce 的通用并行计算框架
- 支持Java、Scala、Python
- Spark Streaming
- Spark GraphX
- MLbase
- Spark SQL
- Spark 计算中间结果基于内存缓存
- MapReduce 基于 HDFS 存储
- Spark 处理数据的能力一般是 MR 的三到五倍以上
- Spark使用有向无环图来切分任务的执行先后顺序
-
Local
- 用于本地测试
-
Standalone
- Spark 自带的一个资源调度框架,它支持完全分布式
-
Yarn
- Hadoop 生态圈里面的一个资源调度框架
-
Mesos
- 资源调度框架
-
每个RDD是由若干个Partition组成
-
HashPartitioner
- 采用哈希的方式对
键值对数据进行分区 - partitionId = Key.hashCode % numPartitions
- 采用哈希的方式对
-
RangePartitioner
- 为了解决HashPartitioner所带来的分区倾斜问题
- 基于抽样的思想来对数据进行分区
- 在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢
-
弹性分布式数据集,是其最基本的抽象数据集
-
五个属性
- RDD 是由一系列的 partition 组成的
- 函数是作用在每一个 partition/split 上
- RDD 之间有一系列的依赖关系
- 分区器是作用在 (K,V) 格式的 RDD 上
- RDD 提供一系列最佳的计算位置
-
Master
- 资源管理的主节点
-
Cluster Manager
- 在集群上获取资源的外部服务
-
Worker
- 资源管理的从节点,管理本机资源
-
Application
- 基于 Spark 的用户程序,包含 driver 程序和运行在集群上的 executor 程序
-
Dirver
- 用来连接工作进程
-
Executor
- 是在一个 worker 进程所管理的节点上为某 Application 启动的一个个进程
-
Task
- 被发送到 executor 上的工作单元
-
Job
- 包含很多任务( Task )的并行计算,和 action 算子对应
-
Stage
- 一个 job 会被拆分成很多组任务,每组任务被称为 Stage
-
转换算子
-
懒加载执行
-
类别
-
filter
-
过滤符合条件的记录数
- true 保留
-
-
map
- 通过 map 中的函数映射变为一个新的元素
-
flatMap
- 先 map 后 flat
-
sample
- 根据传进去的小数按比例进行有放回或者无放回的抽样
-
reduceByKey
- 相同的 Key 根据相应的逻辑进行处理
-
sortByKey / sortBy
- 对 key 进行升序或者降序排序
-
-
-
行动算子
-
触发执行
-
一个 行动算子对应一个JOB
-
类别
- count :返回数据集中的元素数。会在结果计算完成后回收到 Driver 端。
- take(n) :返回一个包含数据集前 n 个元素的集合。
- first :效果等同于 take(1) ,返回数据集中的第一个元素。
- foreach :循环遍历数据集中的每个元素,运行相应的逻辑。
- collect :将计算结果回收到 Driver 端。
-
-
将 RDD 持久化,持久化的单位是 partition
-
三种
-
cache
- 默认将 RDD 的数据持久化到内存中,懒执行
- rdd.cache().count() 返回的不是持久化的RDD,而是一个数值
-
persist
- 可以指定持久化的级别。最常用的是 MEMORY_onLY 和 MEMORY_AND_DISK
- 懒执行
-
checkpoint
- 将 RDD 持久化到磁盘,还可以切断 RDD 之间的依赖关系
- 懒执行
-
-
client
-
适用于测试调试程序
-
Driver进程是应用程序的当前节点启动。在 Driver 端可以看到 task 执行的情况
-
任务流程
- client 模式提交任务后,会在客户端启动 Driver 进程。
- Driver 会向 Master 申请启动 Application 启动的资源。资源申请成功,
- Driver 端将 task 分发到 worker 端执行,启动 executor 进程(任务的分发)。
- Worker 端( exectuor 进程)将 task 执行结果返回到 Driver 端(任务结果的回收)。
-
-
cluster
-
任务流程
- cluster 模式提交应用程序后,会向 Master 请求启动 Driver 。
- Master 接受请求,随机在集群一台节点启动 Driver 进程。
- Driver 启动后为当前的应用程序申请资源。
- Driver 端发送 task 到 worker 节点上执行(任务的分发)。
- worker 上的 executor 进程将执行情况和执行结果返回给 Driver 端(任务结果的回收)。
-
应用程序使用的所有 jar 包和文件,必须保证所有的worker 节点都要有,因为此种方式, spark 不会自动上传包
-
-
client
-
同样是适用于测试,因为 Driver 运行在本地
-
Driver 会与 yarn 集群的Executor 进行大量的通信
-
任务流程
- 客户端提交一个 Application ,在客户端启动一个 Driver 进程。
- 应用程序启动后会向 RS ( ResourceManager )(相当于 standalone 模式下的 master 进程)发送请求,启动 AM ( ApplicationMaster )。
- RS 收到请求,随机选择一台 NM ( NodeManager )启动 AM 。这里的 NM 相当于 Standalone 中的 Worker 进程
- AM 启动后,会向 RS 请求一批 container 资源,用于启动 Executor 。
- RS 会找到一批 NM (包含 container )返回给 AM ,用于启动 Executor 。
- AM 会向 NM 发送命令启动 Executor 。
- Executor 启动后,会反向注册给 Driver , Driver 发送 task 到 Executor ,执行情况和结果返回给 Driver 端。
-
-
cluster
-
任务流程
- 客户机提交 Application 应用程序,发送请求到 RS ( ResourceManager ),请求启动AM ( ApplicationMaster )。
- RS 收到请求后随机在一台 NM ( NodeManager )上启动 AM (相当于 Driver 端)。
- AM 启动, AM 发送请求到 RS ,请求一批 container 用于启动 Excutor 。
- RS 返回一批 NM 节点给 AM 。
- AM 连接到 NM ,发送请求到 NM 启动 Excutor 。
- Excutor 反向注册到 AM 所在的节点的 Driver 。 Driver 发送 task 到 Excutor 。
-
主要用于生产环境中
-
不会产生某一台机器网卡流量激增的现象
-
只能通过 yarn 查看日志
-
-
join
-
leftOuterJoin
- 左外连接
-
rightOuterJoin
- 右外连接
-
fullOuterJoin
- 全连接
-
join
- K,V)join(K,W)返回(K,(V,W))
-
-
union
- 合并两个数据集。两个数据集的类型要一致
- 分区数是合并 RDD 分区数的总和
-
intersection
- 取两个数据集的交集
- 分区数为多的那个集合
-
subtract
- 取两个数据集的差集
- 分区数为前面的那个集合
-
mapPartitions
- 与 map 类似,单位是每个 partition 上的数据
-
distinct(map+reduceByKey+map
- 对 RDD 内数据去重
-
cogroup
- 当调用类型 (K,V) 和 (K,W) 的数据上时,返回一个数据集 (K,(Iterable,Iterable))
-
foreachPartition
- 遍历每个Partition
-
mapPartitionsWithIndex
- 类似于 mapPartitions ,除此之外还会携带分区的索引值
-
repartition
- 增加或减少分区。此算子会产生 shuffle
-
coalesce
- 减少分区,算子中第二个参数是TRUE,表示分区的过程中产生 shuffle
-
groupByKey
- 根据 Key 进行分组。作用在 (K,V)
-
zip
- 将两个 RDD 中的元素( KV格式/非KV格式 )变成一个 KV 格式的 RDD
-
zipWithIndex
- 该函数将 RDD 中的元素和这个元素在 RDD 中的索引号(从0开始)组合成 (K,V) 对
-
countByKey
- K,V 格式的 RDD 上,根据 Key 计数相同 Key 的数据集元素
-
countByValue
- 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数
-
reduce
- 根据聚合逻辑聚合数据集中的每个元素
形容RDD之间的关系,stage根据宽窄依赖进行切分
窄依赖- 目的地一对一或多对一
- 传输过程需要Shuffle
- 目的地多对多或一对多
- 需要Shuffle
- RDD 之间的宽窄依赖
- 遇到宽依赖就划分 stage
- 每个 stage 包含一个或多个 task 任务
- stage 是由一组并行的 task 组成
- 从后往前,遇到宽依赖就切割 stage
- 1.从后向前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到Stage中;
- 2.每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition数量决定的;
- 3.最后一个Stage里面的任务的类型是ResultTask,前面所有其他Stage里面的任务类型都是ShuffleMapTask;
- 4.代表当前Stage的算子一定是该Stage的最后一个计算步骤
-
pipeline 管道计算模式, pipeline 只是一种计算思想、模式。
-
在spark中pipeline是一个partition对应一个partition,所以在stage内部只有窄依赖
-
数据落地
- RDD 进行持久化
- shuffle write 的时候
-
Stage 的 task 并行度是由 stage 的最后一个 RDD 的分区数来决定
-
使用算子时传递 分区num参数 就是分区 partition 的数量
-
在 Application 执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的 task 执行完成后,才会释放这部分资源。
-
优点
- 每一个 task 直接使用资源
- task 启动快–> application 执行就快了
-
缺点
- 最后一个 task 执行完才释放资源,无法充分利用
-
Application 执行之前不需要先去申请资源,而是直接执行,让 job 中的每一个 task 在执行前自己去申请资源, task 执行完成就释放资源。
-
优点
- 集群的资源可以充分利用
-
缺点
- task 自己去申请资源, task 启动变慢, Application 的运行就响应的变慢了
- 节省了通信的成本和服务器的资源
- 加快数据传输速度
- val list = List(“hello yjx”)
- val broadCast = sc.broadcast(list)
- 广播变量只能在 Driver 端定义,不能在 Executor 端定义。
- 在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值。
- 一种调优方式
- 为了让一个变量被改变时在driver端进行全局汇总
- task运行的只是原始变量的一个副本,并不能改变原始变量的值
- 当这个变量被声明为累加器后,该变量就会有分布式计数的功能
- val sc = new SparkContext(conf)
- val accumulator = sc.longAccumulator
- accumulator.add(1)
- 累加器在 Driver 端定义赋初始值,Driver 端读取,在 Excutor 端更新
- 累加器不是一个调优的操作,因为如果不这样做,结果是错的
-
一个stage–>另一个stage,中间数据转换过程
-
Shuffle Write
- 上一个stage的每个map task就必须保证将自己处理的当前分区的数据相同的key写入一个分区文件中
-
Shuffle Read
- reduce task就会从上一个stage的所有task所在的机器上寻找属于自己的那些分区文件,这样就可以保证每一个key所对应的value都会汇聚到同一个节点上去处理
-
HashShuffle
-
普通机制
- 上游产生数据的时候根据下游的partition生成文件
- 文件数:MapTask * ReduceTask
-
合并机制
- 根据下游partition数,每个Executor 创建一批文件
- 文件数:MapExectur * ReduceTask
-
这两种方式都会产生很多小文件,合并机制虽然减少了很多文件
-
-
SortShuffle
-
基本和MapReduce思想相同
-
普通机制
- map产生中间结果写出到KVBuffle
分区排序–溢写–合并 - 文件数:MapTask * 2
- map产生中间结果写出到KVBuffle
-
bypass机制
- shuffle reduce task 的数量小于200
- 不需要进行 map 端的预聚合,比如 groupBykey , join
- 文件数:MapTask * 2
-



