Spark常用算子之行为算子
foreach
// foreach 没有返回值 会触发job
// 需要接收一个函数f:参数为RDD中的泛型,返回值类型为Unit
// 1、读取students、scores数据
val stuRDD: RDD[String] = sc.textFile("Spark/data/stu/students.txt")
// foreach 没有返回值 会触发job
// 需要接收一个函数f:参数为RDD中的泛型,返回值类型为Unit
stuRDD.foreach(println)
count
// count 统计RDD中的数据条数
// count 统计RDD中的数据条数
println(stuRDD.count())
collect
// collect 将RDD中的数据转换成scala中的数组
// 使用时注意数据量的大小
// collect 将RDD中的数据转换成scala中的数组
// 使用时注意数据量的大小
val stuArr: Array[String] = stuRDD.collect()
val blackListRDD: RDD[String] = sc.parallelize(List("1500100001", "1500100007", "1500100009"))
val blacklistArr: Array[String] = blackListRDD.collect()
stuRDD.filter(line => {
// 在一个RDD中不能直接使用另一个RDD
// blackListRDD.collect().contains(line.split(",")(0))
// 如果遇到了需要RDD中套另一个RDD的情况
// 可以换一种思路去实现
blacklistArr.contains(line.split(",")(0))
}).foreach(println)
reduce
// 传入一个聚合函数
// select sum(age) from students group by 1
// 全局的聚合(将所有数据作为一个组进行聚合)
// reduce
val sumAge: Int = stuRDD
.map(line => line.split(",")(2).toInt)
// 传入一个聚合函数
// select sum(age) from students group by 1
// 全局的聚合(将所有数据作为一个组进行聚合)
.reduce((i, j) => i + j)
println(sumAge)
lookup
// lookup 作用在K-V格式的RDD上,传入一个Key,返回所有与之匹配的Key对应的value
val ids: Seq[String] = stuRDD.map(line => (line.split(",")(1), line.split(",")(0)))
.lookup("尚孤风")
println(ids)
take
// take 传入一个Int类型的值,从RDD中取多少条数据返回并构建Array
// take 传入一个Int类型的值,从RDD中取多少条数据返回并构建Array
val stuArr2: Array[String] = stuRDD.take(10)
// 这里的foreach不再是RDD的算子,而是Array的方法
stuArr2.foreach(println)
完整代码
package com.xiaoming.core
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo14Action {
def main(args: Array[String]): Unit = {
// 构建Spark上下文环境
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo14Action")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
// 1、读取students、scores数据
val stuRDD: RDD[String] = sc.textFile("Spark/data/stu/students.txt")
// foreach 没有返回值 会触发job
// 需要接收一个函数f:参数为RDD中的泛型,返回值类型为Unit
stuRDD.foreach(println)
// count 统计RDD中的数据条数
println(stuRDD.count())
// collect 将RDD中的数据转换成scala中的数组
// 使用时注意数据量的大小
val stuArr: Array[String] = stuRDD.collect()
val blackListRDD: RDD[String] = sc.parallelize(List("1500100001", "1500100007", "1500100009"))
val blacklistArr: Array[String] = blackListRDD.collect()
stuRDD.filter(line => {
// 在一个RDD中不能直接使用另一个RDD
// blackListRDD.collect().contains(line.split(",")(0))
// 如果遇到了需要RDD中套另一个RDD的情况
// 可以换一种思路去实现
blacklistArr.contains(line.split(",")(0))
}).foreach(println)
// reduce
val sumAge: Int = stuRDD
.map(line => line.split(",")(2).toInt)
// 传入一个聚合函数
// select sum(age) from students group by 1
// 全局的聚合(将所有数据作为一个组进行聚合)
.reduce((i, j) => i + j)
println(sumAge)
// saveAsTextFile 将结果保存到文件
// stuRDD.saveAsTextFile("data/stu/newStu.txt")
// lookup 作用在K-V格式的RDD上,传入一个Key,返回所有与之匹配的Key对应的value
val ids: Seq[String] = stuRDD.map(line => (line.split(",")(1), line.split(",")(0)))
.lookup("尚孤风")
println(ids)
// take 传入一个Int类型的值,从RDD中取多少条数据返回并构建Array
val stuArr2: Array[String] = stuRDD.take(10)
// 这里的foreach不再是RDD的算子,而是Array的方法
stuArr2.foreach(println)
}
}