package thisterm;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class wordcount2 {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD dataFile = sc.textFile
("file:///home/gyq/下载/spark-2.3.2-bin-hadoop2.7/data/words.txt".toLowerCase());//*是所有文件内容
JavaRDD rdd1=dataFile.map(f->f.trim().
replaceAll("[\pP+~$`^=|<>~`$^+=|<>¥×]" , ""));//trim()去首位空格replaceAll将奇怪的符号去掉
JavaRDD rdd2=rdd1.map(f->f.replaceAll("\s+", " "));//将连续的空格变成一个空格
JavaRDD rdd3=rdd2.flatMap(f->Arrays.asList(f.split(" ")).iterator());//用空格分开
JavaRDD rdd5=rdd3.filter(f->{
if(f.length()==0)
return false;
else
return true;
});
JavaPairRDD rdd6=rdd5.mapToPair(f->new Tuple2<>(f.trim(),1));
JavaPairRDD rdd7=rdd6.reduceByKey((x,y)->x+y);
JavaPairRDD rdd8=rdd7.mapToPair(f->new Tuple2<>(f._2,f._1));
JavaPairRDD rdd9=rdd8.sortByKey(false,2).filter(f->f._1>3);//sortByKey根据key排序
List> rdd10=rdd9.collect();
// rdd9.saveAsTextFile("file:///home/gyq/下载/spark-2.3.2-bin-hadoop2.7/data/r大于3的.txt");
for(Tuple2 aa:rdd10) {
System.err.println(aa);
}
sc.stop();
}
}
保存到电脑里面



