Java
public class WordCount {
public static void main(String[] args) {
// 配置sparkconf,启动spark应用程序
SparkConf sparkConf = new SparkConf().setAppName("spark").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//使用textFile方法读取文件,逐行读取
JavaRDD line = sc.textFile("D:/JavaCode/javascala/src/main/scala/rdd/builder/word");
// flatMap算子,拆分单词
JavaRDD words = line.flatMap(new FlatMapFunction() {
public Iterator call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//map算子,对单词进行映射 word =>(Word,1)
JavaPairRDD word = words.mapToPair(new PairFunction() {
public Tuple2 call(String s) throws Exception {
return new Tuple2(s, 1);
}
});
//reduceByKey算子,最相同的单词进行分组聚合
JavaPairRDD wordCount = word.reduceByKey(new Function2() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//遍历打印结果
wordCount.foreach(new VoidFunction>() {
public void call(Tuple2 stringIntegerTuple2) throws Exception {
System.out.println(stringIntegerTuple2._1+":"+stringIntegerTuple2._2);
}
});
sc.stop();
}
}
Scala
object WordCount {
def main(args: Array[String]): Unit = {
//wordcount程序
//1.创建spark应用程序
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(sparkConf)
//2. 读取文件
val words = sc.textFile("D:/JavaCode/javascala/src/main/scala/rdd/builder/word")
//3. 分词
val word = words.flatMap(_.split(" "))
//4.map映射
val word_map = word.map(x => (x, 1))
//5.聚合
val word_count = word_map.reduceByKey(_ + _)
//6.打印结果
word_count.sortBy((x)=>(-x._2)).collect().foreach(println)
word_count.groupByKey().foreach(println)
//7.关闭spark程序
sc.stop()
}
}
总结:使用Scala开发spark应用程序,相对Java要更简洁,更方便。



