大数据中按照某个 Key 进行分组,找出每个组内数据的 topN 时,这种情况就 是分组取 topN 问题
解决分组取 TopN 问题有两种方式,第一种就是直接分组,对分组内的数据进 行排序处理。第二种方式就是直接使用定长数组的方式解决分组取 topN 问题。
- 数据源
湖人 24 湖人 8 凯尔特人 0 凯尔特人 21 公牛 23 公牛 1 凯尔特人 34 湖人 23 凯尔特人 88 公牛 13 湖人 6 公牛 10
- scala
package action
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.control.Breaks
object GroupByKeySortTest2 {
def main(args: Array[String]): Unit = {
val context = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("group")
)
context.setLogLevel("Error")
context.textFile("data/data.txt")
.map(line=>{
val team = line.split("t")(0)
val num = line.split("t")(1).toInt
(team,num)
}).groupByKey().foreach(e=>{
val key = e._1
val ite = e._2.iterator
val top3 = new Array[Int](3)
val break = new Breaks
while (ite.hasNext){
val num = ite.next()
break.breakable{
for (i<-0 until 3){
if(top3(i)==0){
top3(i)=num
break.break()
}else if (num>top3(i)){
for (j<-2 until (i,-1)){
top3(j)=top3(j-1)
}
top3(i)=num
break.break()
}
}
}
}
println(s"key = $key,num = ${top3.toBuffer}")
// val key = e._1
// val list = e._2.iterator.toList
// val ints = list.sortWith(_>_)
// println(s"key = $key,num = $ints")
// if (ints.length>3){
// for (i<-0 until 3)
// println(s"key = $key,num = ${list(i)}")
// }else{
// for(num<-list)
// println(s"key = $key,num = $num")
// }
})
}
}
3. java
package action;
import org.apache.commons.collections.IteratorUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
public class GroupByKeySortTest {
public static void main(String[] args) {
JavaSparkContext context = new JavaSparkContext(
new SparkConf()
.setMaster("local")
.setAppName("group")
);
context.setLogLevel("Error");
context.textFile("data/data.txt")
.mapToPair(e->{
String team = e.split("t")[0];
Integer num = Integer.valueOf(e.split("t")[1]);
return new Tuple2<>(team,num);
}).groupByKey()
.foreach(e->{
String team = e._1;
Iterator ite = e._2.iterator();
Integer[] top3 = new Integer[3];
while (ite.hasNext()){
Integer num = ite.next();
for (int i = 0; i < top3.length; i++) {
if (top3[i]==null){
top3[i]=num;
break;
}else if (num>top3[i]){
for (int j = 2; j > i; j--) {
top3[j] = top3[j-1];
}
top3[i] = num;
break;
}
}
}
for (Integer num:top3){
System.out.println("Team:"+team+",num:"+num);
}
// String team = e._1;
// List list = IteratorUtils.toList(e._2().iterator());
// Collections.sort(list, new Comparator() {
// @Override
// public int compare(Integer o1, Integer o2) {
// return o2-o1;
// }
// });
// if (list.size()>3){
// for (int i = 0; i < 3; i++) {
// System.out.println("Team:"+team+",num:"+list.get(i));
// }
// }else
// for (Integer num:list)
// System.out.println("Team:"+team+",num:"+num);
});
}
}



