栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark 分组取 top案例

Spark 分组取 top案例

大数据中按照某个 Key 进行分组,找出每个组内数据的 topN 时,这种情况就 是分组取 topN 问题
解决分组取 TopN 问题有两种方式,第一种就是直接分组,对分组内的数据进 行排序处理。第二种方式就是直接使用定长数组的方式解决分组取 topN 问题。

  1. 数据源
湖人	24
湖人	8
凯尔特人	0
凯尔特人	21
公牛	23
公牛	1
凯尔特人	34
湖人	23
凯尔特人	88
公牛	13
湖人	6
公牛	10
  1. scala
package action

import org.apache.spark.{SparkConf, SparkContext}

import scala.util.control.Breaks


object GroupByKeySortTest2 {
  def main(args: Array[String]): Unit = {
    val context = new SparkContext(
      new SparkConf()
        .setMaster("local")
        .setAppName("group")
    )
    context.setLogLevel("Error")
    context.textFile("data/data.txt")
      .map(line=>{
        val team = line.split("t")(0)
        val num = line.split("t")(1).toInt
        (team,num)
      }).groupByKey().foreach(e=>{
      
      val key = e._1
      val ite = e._2.iterator
      val top3 = new Array[Int](3)
      val break = new Breaks
      while (ite.hasNext){
        val num = ite.next()
       break.breakable{
         for (i<-0 until 3){
           if(top3(i)==0){
             top3(i)=num
             break.break()
           }else if (num>top3(i)){
             for (j<-2 until (i,-1)){
               top3(j)=top3(j-1)
             }
             top3(i)=num
             break.break()
           }
         }
       }
      }
      println(s"key = $key,num = ${top3.toBuffer}")
      
//      val key = e._1
//      val list = e._2.iterator.toList
//      val ints = list.sortWith(_>_)
//      println(s"key = $key,num = $ints")
//      if (ints.length>3){
//        for (i<-0 until 3)
//          println(s"key = $key,num = ${list(i)}")
//      }else{
//        for(num<-list)
//          println(s"key = $key,num = $num")
//      }
    })
  }
}


3. java

package action;

import org.apache.commons.collections.IteratorUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;


public class GroupByKeySortTest {
    public static void main(String[] args) {
        JavaSparkContext context = new JavaSparkContext(
                new SparkConf()
                        .setMaster("local")
                        .setAppName("group")
        );
        context.setLogLevel("Error");
        context.textFile("data/data.txt")
                .mapToPair(e->{
                    String team = e.split("t")[0];
                    Integer num = Integer.valueOf(e.split("t")[1]);
                    return new Tuple2<>(team,num);
                }).groupByKey()
                .foreach(e->{
                    
                    String team = e._1;
                    Iterator ite = e._2.iterator();
                    Integer[] top3 = new Integer[3];
                    while (ite.hasNext()){
                        Integer num = ite.next();
                        for (int i = 0; i < top3.length; i++) {
                            if (top3[i]==null){
                                top3[i]=num;
                                break;
                            }else if (num>top3[i]){
                                for (int j = 2; j > i; j--) {
                                    top3[j] = top3[j-1];
                                }
                                top3[i] = num;
                                break;
                            }
                        }
                    }
                    for (Integer num:top3){
                        System.out.println("Team:"+team+",num:"+num);
                    }
                    
//                    String team = e._1;
//                    List list = IteratorUtils.toList(e._2().iterator());
//                    Collections.sort(list, new Comparator() {
//                        @Override
//                        public int compare(Integer o1, Integer o2) {
//                            return o2-o1;
//                        }
//                    });
//                    if (list.size()>3){
//                        for (int i = 0; i < 3; i++) {
//                            System.out.println("Team:"+team+",num:"+list.get(i));
//                        }
//                    }else
//                        for (Integer num:list)
//                            System.out.println("Team:"+team+",num:"+num);
                });
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/653819.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号