1.中文分词
中文分词选用的是Ansj框架。分词部分使用Java语言封装,由Spark中的flatMap算子调用。分词后根据词性做了筛选。
package com.ahn.spark;
import org.ansj.domain.Result;
import org.ansj.domain.Term;
import org.ansj.splitWord.analysis.ToAnalysis;
import java.util.*;
public class AnsjTest {
public static List fenci(String str){
//筛选词性集合
Set expectedNature = new HashSet() {{
add("n");
add("v");
add("vd");
add("vn");
add("vf");
add("vx");
add("vi");
add("vl");
add("vg");
add("nt");
add("nz");
add("nw");
add("nl");
add("ng");
add("userDefine");
add("wh");
}};
//存放结果的List
List array=new ArrayList<>();
//开始分词
Result result = ToAnalysis.parse(str);
//得到结果集
List terms = result.getTerms();
terms.forEach(t->{
//根据筛选集合筛选出符合条件的数据
if (expectedNature.contains(t.getNatureStr())) {
array.add(t.getName());
}
});
return array;
}
}
2.词频统计
词频统计使用Scala进行编写,具体步骤与WorldCount相同。
package com.ahn.spark
import org.apache.spark.{SparkConf, SparkContext}
import com.ahn.spark.AnsjTest.{fenci}
object Test {
def main(args: Array[String]): Unit = {
//初始化SparkConf
val conf: SparkConf = new SparkConf()
//设置使用本地运行的方式 还可以选择Spark自身集群的方式 以及on Yarn的方式
conf.setMaster("local")
//设置应用名称
conf.setAppName("SparkTest")
//初始化SparkContext
val context = new SparkContext(conf)
//设置日志打印级别
context.setLogLevel("ERROR")
//读取数据文件
val value = context.textFile("src/main/resources/test.txt")
//flatMap算子中使用fenci()方法
val words = value.flatMap(v => fenci(v).toArray())
//Map算子将数据处理为(key 1)的形式
val pairs = words.map(word => (word, 1))
//reduceByKey根据key进行合并 将value的值进行相加。
val wordcounts = pairs.reduceByKey(_ + _)
//根据value值进行降序排序 并取出前10条记录打印
wordcounts.sortBy(x=>x._2,false).take(10).foreach(println)
}
}
3.pom文件
4.0.0 org.example spark_test1.0-SNAPSHOT 8 8 aliyun http://maven.aliyun.com/nexus/content/groups/public/ cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ jboss http://repository.jboss.com/nexus/content/groups/public org.apache.spark spark-core_2.123.0.1 org.apache.spark spark-sql_2.123.0.1 provided org.ansj ansj_seg5.1.6
4.项目目录
5.结果



