栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

9.2.4、Spark Core

9.2.4、Spark Core

1、创建RDD的方式

(1)parallelize:从集合中创建RDD
(2)makeRDD:从集合中创建RDD(底层还是parallelize)(第二个参数:分区的数量)
(3)textFile的方式读取文件创建RDD

2、算子

(1)转换算子(Transformations):将一个RDD变成另一个RDD,RDD之间的转换----懒执行,需要行为算子触发执行
(2)操作算子(Actions行为算子):触发任务的调度和作业的执行,不能一个RDD变成另一个 collect,take

3、算子应用

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果
(1)map
(2)flatMap
(3)mapPartition(转换算子)//按分区去处理数据
//有时需要从外部获取数据
(4)foreachPartition(转换算子)

//每一个分区由一个task去处理数据,相当于每一个分区会创建一个连接
//适用于一般需要将数据写入到外部系统
//有时在处理数据的时候需要和外部系统建立连接
//如果连接建立在Driver端,连接是不能被序列化的,及在算子内部无法使用算子外部创建的连接
//那么可以使用MapPartition,foreachPartition算子降低连接的创建与销毁次数,提高效率

(5)mapPartitionsWithIndex(转换算子):会在遍历数据的时候给分区加上一个索引index

val stuRDD: RDD[String] = sc.textFile("sparkproject/data/students.txt")
stuRDD.mapPartitionsWithIndex((index,rdd)=>{
  println("当前遍历的分区"+index)
  rdd.map(line=>line.split(",")(1))
}).foreach(println)

(6)filter(转换算子):过滤数据
(7)sample(转换算子):对数据进行取样

val stuRDD: RDD[String] = sc.textFile("sparkproject/data/students.txt")


val smaRDD = stuRDD.sample(false,0.1)
smaRDD.foreach(println)

(8)union(转换算子):将两个RDD首尾相连成一个RDD,两个RDD结构需要一样
(9)join(转换算子):关联RDD
(10)groupBy

stuRDD.groupBy(line => line.split(",")(4))
  .map(kv => (kv._1, kv._2.size))
  .foreach(println)
groupByKey:作用在key-value格式上,默认按key分组
stuRDD.map(line => (line.split(",")(4), line))
  .groupByKey()
  .map(ke => (ke._1, ke._2.size))
  .foreach(println)
reduceByKey:需要接收一个函数(聚合函数),按key分组,后面1+1...(一般加和,max,min等)	
stuRDD.map(line => (line.split(",")(4), 1))
  .reduceByKey((x: Int, y: Int) => x + y)
  .foreach(println)
reduecBykey会进行预聚合数据,而groupByKey不会,效率比groupByKey,但是功能弱`在这里插入代码片`

(11)sortBy(转换算子)

//设置参数降序
stuRDD.sortBy(line=>line.split(",")(0),ascending = false)
  .foreach(println)
sortByKey直接以key进行排序,默认升序
stuRDD.map(line=>(line.split(",")(0),line))
  .sortByKey(ascending = false)
  .foreach(println)

(12)mapValue(转换算子):作用在key-value格式上,传入一个函数,将RDD的value传给函数,key不变,数据规模也不变
(13)操作算子:foreach,take,count(返回RDD的长度)

collect:将RDD转化为数组Array(将数据拿到内存,注意内存大小)
reduce:全局聚合
saveAsTextFile:数据保存

(14)countByKey:求wordcount的,最后返回map,是一个行为算子

countByKey的源码: 思路很简单,就是先把rdd的value都变成1,
然后reduceByKey,在通过collect行动算子拉取到driver, 最后toMap转换成一个Map类型

4、拉链
相应位置的数据进行组合,拉链操作两个数据类型可以不一致
val list1RDD: RDD[Int] = sc.parallelize(List(1,2,3,4))
val list2RDD: RDD[Int] = sc.parallelize(List(5,6,7,8))
val rdd3: RDD[(Int, Int)] = list1RDD.zip(list2RDD)
rdd3.foreach(println)
5、思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?

reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同

FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同

CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/582383.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号