假设这些信息都存存储在一个文件里
时间数 省份 城市 用户 广告
如下所示:
(中间字段使用空格隔开)
(1)数据转换为键值对的形式(省份-广告,点击数)
(2)计算相同key的点击数和(省份-广告,点击数和)
(3转换key的结构(省分广告,点击数和) => (省份,(广告,点击数和) )。
(4)按照key聚合。
(5)对值进行排序。
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.recommendation.Rating;
import scala.Tuple2;
public class ProviceAD {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD filedata=sc.textFile("file:///home/gyq/eclipse-workspace/ALS/PeopleInfo.txt");
JavaRDD filedatafi=filedata.filter(f->{
String arr[]=f.split(" ");
if(arr.length==5)
return true;
return false;
});
JavaRDD data1=filedatafi.flatMap(f->Arrays.asList(f.split(" ")[1]).iterator());
JavaRDD data2=filedatafi.flatMap(f->Arrays.asList(f.split(" ")[2]).iterator());
JavaPairRDD,Integer> PAdata=filedatafi.mapToPair(s->{
return new Tuple2,Integer>(
new Tuple2(s.split(" ")[1],s.split(" ")[4]),1);
});
// PAdata.foreach(f->System.out.println(f));//第一题
JavaPairRDD,Integer> ggs=PAdata.reduceByKey((x,y)->x+y);
//ggs.foreach(f->System.out.println("(省份,"+ "广告),点击数"+f));//第二题
JavaPairRDD> pairrdd1=ggs.mapToPair(f->{
return new Tuple2>(
Integer.valueOf(f._1._1),new Tuple2(f._1._2,f._2()));
});
//pairrdd1.foreach(f->System.out.println("省份"+ ",(广告,点击数)"+f));//第三题
JavaPairRDD>> gg=pairrdd1.groupByKey();
//gg.foreach(f->System.out.println("按照key聚合"+f._1+"------"+f._2));//第四题
JavaRDD rdd=gg.map(f->{
return Integer.valueOf(f._1)+" "+f._2;
});
JavaRDD SortresultRDD = rdd.sortBy(f->{return f.split(" ")[0];},true,1);//fun5.1排序2
//SortresultRDD.foreach(f->System.out.println(f));
JavaPairRDD>> gss=gg.sortByKey(true,1);//fun5.2排序1
//gss.foreach(f1->System.out.println("ppp"+f1));//第五题
JavaRDD gss1=rdd.sortBy(f->{return f.split(" ")[0];},false,1);//
SortresultRDD.foreach(f->System.out.println(f));//第5题
}
}



