public static void main(String[] args) {
//TODO 0.env/创建环境
SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]"); //指定任务名和运行的服务器
JavaSparkContext sc = new JavaSparkContext(conf); //创建sc
sc.setLogLevel("WARN"); //设置日志级别
//TODO 1.source/加载数据/创建RDD
JavaRDD lines = sc.textFile("data/input/data.txt"); //读取数据文件
List data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD distData = sc.parallelize(data); //加载数据
//TODO 2.transformation
//map算子 使原值乘2
JavaRDD disRDD = distData.map(new Function() {
@Override
public Integer call(Integer s) throws Exception {
return s*2;
}
});
//map算子 使原值乘2 lambda 表达式
JavaRDD disRDD2 = distData.map((Function) s -> s*2);
//输出结果
disRDD.foreach(new VoidFunction() {
@Override
public void call(Integer s) throws Exception {
System.out.println(s);
}
});
//输出结果 lambda 表达式
disRDD2.foreach((VoidFunction) s -> System.out.println(s));
//按空格分割数据
JavaRDD newlines= lines.flatMap(new FlatMapFunction() {
@Override
public Iterator call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//按空格分割数据 lambda 表达式
JavaRDD newlines2 = lines.flatMap((FlatMapFunction) s->Arrays.asList(s.split(" ")).iterator());
//对单词计数
JavaPairRDD wordsRDD = newlines.mapToPair(new PairFunction() {
@Override
public Tuple2 call(String s) throws Exception {
return new Tuple2<>(s,1);
}
});
//对单词计数 lambda 表达式
JavaPairRDD wordsRDD2 = newlines2.mapToPair((PairFunction) s -> new Tuple2<>(s,1));
//分组聚合
JavaPairRDD newwordsRDD = wordsRDD.reduceByKey(new Function2() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
});
//分组聚合 lambda 表达式
JavaPairRDD newwordsRDD2 = wordsRDD2.reduceByKey((Function2) (a,b)->(a+b));
//输出到控制台
newwordsRDD.foreach(new VoidFunction>() {
@Override
public void call(Tuple2 t) throws Exception {
System.out.println(t._1+" "+t._2);
}
});
//输出到控制台 lambda 表达式
newwordsRDD2.foreach((VoidFunction>) t-> System.out.println(t._1()+" "+t._2()));
//输出到目录
newwordsRDD2.repartition(1).saveAsTextFile("data/output/spark2");
//join
List> studentList = Arrays.asList(
new Tuple2(1, "张三"),
new Tuple2(2, "李四"),
new Tuple2(3, "王五"),
new Tuple2(4, "赵六"));
List> scoreList = Arrays.asList(
new Tuple2(1, 86),
new Tuple2(2, 67),
new Tuple2(3, 78),
new Tuple2(4, 88));
//并行化两个RDD
JavaPairRDD students = sc.parallelizePairs(studentList);;
JavaPairRDD scores = sc.parallelizePairs(scoreList);
//使用join算子关联两个RDD
//join以后,会根据key进行join,并返回JavaPairRDD
//JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key类型,因为通过key进行join的
//第二个泛型类型,是Tuple2的类型,Tuple2的两个泛型分别为原始RDD的value的类型
JavaPairRDD> studentScores = students.join(scores);
//打印
studentScores.foreach(new VoidFunction>>() {
@Override
public void call(Tuple2> t)
throws Exception {
System.out.println("student id:" + t._1);
System.out.println("student name:" + t._2._1);
System.out.println("student score:" + t._2._2);
System.out.println("==========================");
}
});
sc.close();
}