阿巴巴阿巴巴巴
package thisterm;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
public class task3_1 {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf=new SparkConf().setAppName("filter").setMaster("local[2]");
JavaSparkContext sc=new JavaSparkContext(conf);
JavaRDD data = sc.textFile("file:///home/gyq/eclipse-workspace/student/student.txt");
JavaRDD mathdata = sc.textFile("file:///home/gyq/eclipse-workspace/student/result_math.txt");
JavaRDD bigdata = sc.textFile("file:///home/gyq/eclipse-workspace/student/result_bigdata.txt");
JavaPairRDD prdd1=mathdata.mapToPair(f->new Tuple2<>(f.split(" ")[0],Integer.valueOf(f.split(" ")[2])));
JavaPairRDD prdd2=bigdata.mapToPair(f->new Tuple2<>(f.split(" ")[0],Integer.valueOf(f.split(" ")[2])));//ID加成绩
JavaPairRDD prdd3=data.mapToPair(f->new Tuple2<>(f.split(" ")[0],f.split(" ")[1]));//ID加名字
// JavaRDD sortmath=mathdata.sortBy(f->Integer.valueOf(f.split(" ")[2]), false, 1);
// List math_5=sortmath.take(5);
//bigdata.foreach(f->System.err.println(f));
// math_5.foreach(f->System.err.println(f));
//for(String aa:math_5) {
// System.out.println(aa);//数学前五名的
//}
// (2)找出单科成绩为100的学生ID
// 3)输出每位学生的总成绩,要求将两个成绩表中学生ID相同的成绩相加。
JavaRDD Sumcourse=mathdata.union(bigdata);
JavaPairRDD rdd3=Sumcourse.mapToPair(f->new Tuple2<>(f.split(" ")[0],Integer.valueOf(f.split(" ")[2])));
//3 JavaRDD rdd3=Sumcourse.map(f->f.split(" ")[0]+" "+f.split(" ")[2]);
JavaPairRDD rdd4=rdd3.reduceByKey((x,y)->x+y);//ID+总成绩
// rdd4.foreach(f->System.err.println(f));//输出每位学生的总成绩
// (4)计算每一个学生的平均成绩)
JavaPairRDD rddave=rdd4.mapToPair(f->new Tuple2<>(f._1,Double.valueOf(f._2)/2));
//rddave.foreach(f->System.err.println(f));
// (5)汇总学生成绩并以文本格式存储在HDFS上,数据汇总为学生ID,姓名,大数据成绩,数学成绩,总分,平均分。)
JavaPairRDD> rdd5=rdd4.join(rddave);
JavaPairRDD> rdd6=prdd2.join(prdd1);
JavaPairRDD, Tuple2>> rdd7= rdd6.join(rdd5);
JavaPairRDD, Tuple2>>> rdd8= prdd3.join(rdd7);
rdd5.foreach(f->System.err.println(f));
rdd6.foreach(f->System.err.println(f));
rdd8.foreach(f->System.err.println(f));
}
}



