首先来看下spark的wordcount的top5
package org.example.spark
import java.security.MessageDigest
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
import org.apache.spark.rdd.RDD
def main(args: Array[String]): Unit = {
// 这里的下划线"_"是占位符,代表数据文件的根目录
val file: String = "D:\testCode\words.txt"
// 读取文件内容
//设置spark的配置文件信息
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount")
.setMaster("local[2]")
//构建sparkcontext上下文对象,它是程序的入口,所有计算的源头
val sc: SparkContext = new SparkContext(sparkConf)
//读取文件
val lineRDD: RDD[String] = sc.textFile(file)
// 以行为单位做分词val
val words: RDD[String] = lineRDD.flatMap(line => line.split(" "))
// 过滤掉空字符串
val cleanWordRDD: RDD[String] = words.filter(word => !word.equals(""))
// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
// 打印词频最高的5个词汇
println(wordCounts.count())
//反转kv然后sortByKey排序取出前五
val tuples = wordCounts.map {
case (k, v) => (v, k) }.sortByKey(false).take(5)
println(tuples.toString)
tuples.foreach{ line=>
println("word="+line._2+" ,num="+line._1)
}
}
}
结果
map:以元素为粒度的数据转换从原来的对单词计数,改为对单词的哈希值计数
val hashkvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
// 获取MD5对象实例
val md5 = MessageDigest.getInstance("MD5")
// 使用MD5计算哈希值
val hash = md5.digest(word.getBytes).mkString // 返回哈希值与数字1的Pair
(hash, 1)
}
val hashwordCounts: RDD[(String, Int)] = hashkvRDD.reduceByKey((x, y) => x + y)
hashwordCounts.foreach{ line=>
println("word="+line._1+" ,num="+line._2)
}
mapPartitions:以数据分区为粒度的数据转换
由于 map(f) 是以元素为单元做转换的,那么对于 RDD 中的每一条数据记录,我们都需要实例化一个 MessageDigest 对象来计算这个元素的哈希值。 在工业级生产系统中,一个 RDD 动辄包含上百万甚至是上亿级别的数据记录, 如果处理每条记录都需要事先创建 MessageDigest,那么实例化对象的开销就会聚沙成塔, 不知不觉地成为影响执行效率的罪魁祸首。
那么问题来了 ,有没有什么办法,能够让 Spark 在更粗的数据粒度上去处理数据呢?还真有, mapPartitions 和 mapPartitionsWithIndex 这对“孪生兄弟”就是用来解决类似的问题。 相比 mapPartitions,mapPartitionsWithIndex 仅仅多出了一个数据分区索引, 因此接下来我们把重点放在 mapPartitions 上面
// 映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的多
复用一个partition内的对象
val kvpartitionRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions(
partition => {
// 注意!这里是以数据分区为粒度,获取MD5对象实例
val md5 = MessageDigest.getInstance("MD5")
val newPartition = partition.map( word => {
// 在处理每一条数据记录的时候,可以复用同一个Partition内的MD5对象
(md5.digest(word.getBytes()).mkString,1)
})
newPartition
})
val hashpartitionCounts: RDD[(String, Int)] = kvpartitionRDD.reduceByKey((x, y) => x + y)
hashpartitionCounts.foreach{ line=>
println("word="+line._1+" ,num="+line._2)
}
结果为
mapPartitionsWithIndex:分区为粒度操作,还会有分区索引
mapPartitionsWithIndex,不过提供了两个参数,第一个参数为分区的索引,索引默认0开始,当你的业务逻辑中
val rdd = sc.makeRDD(List(12,3,4,5,6,4))
//分成两个区
val newrdd = rdd.repartition(2)
val rdd2 = newrdd.mapPartitionsWithIndex((i:Int,it:Iterator[Int])=> {
it.map(t=> s"p=$i,v=$t")
})
rdd2.collect().toBuffer.foreach(println)
需要使用到分区编号的时候,不妨考虑使用这个算子来实现代码



