- hanlp处理数据流入kafka
- hanlp使用可以参考下面官网
- 本文样例
- 读文本文件,通过hanlp分词
- 进行了文版表情剔除,url剔除 正则处理,带语性分析需要下载hanlp的data放到resource下,使用标准的不用
- 写 入kafka
- kafka中数据参考:
- flink处理kafka数据
- 本地webui 需要引入依赖,加上本地启动的配置即可访问页面
- 效果图
- pom文件
本文样例https://www.hanlp.com/
{"text":"这是个文本样例啊"}
{"text":"这是个文本样例啊"}
读文本文件,通过hanlp分词
进行了文版表情剔除,url剔除 正则处理,带语性分析需要下载hanlp的data放到resource下,使用标准的不用
public void run() {
//停用词使用
CoreStopWordDictionary.load(HanLP.Config.CoreStopWordDictionaryPath, true);
// 分词结果不显示词性
HanLP.Config.ShowTermNature = false;
HanLP.Config.Normalization = true;
StringBuilder sb = new StringBuilder();
try{
File file = new File(filePath);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
String str = null;
while ((str = bufferedReader.readLine()) != null) {
String inputStr = JSONObject.parseObject(str).getString("text");
//去除url 表情
String urlPattern = "((https?|ftp|gopher|telnet|file|Unsure|http):((//)|(\\))+[\w\d:#@%/;$()~_?\+-=\\\.&]*)[\ud83c\udc00-\ud83c\udfff]|[\ud83d\udc00-\ud83d\udfff]|[\u2600-\u27ff]|[\ud83e\udd00-\ud83e\uddff]|[\u2300-\u23ff]|[\u2500-\u25ff]|[\u2100-\u21ff]|[\u0000-\u00ff]|[\u2b00-\u2bff]|[\u2d06]|[\u3030]|\pP|\pS";
Pattern p = Pattern.compile(urlPattern, Pattern.CASE_INSENSITIVE);
Matcher m = p.matcher(inputStr);
inputStr = m.replaceAll("");
sb.append(inputStr);
}
//停用词使用
List segment = NLPTokenizer.segment(sb.toString());
CoreStopWordDictionary.apply(segment);
//写kafka
writeToKf(segment);
}catch (Exception e){
e.printStackTrace();
}
}
写 入kafka
public void writeToKf(Listkafka中数据参考: flink处理kafka数据segment) { //寫入kafka Properties ps = new Properties(); ps.setProperty("bootstrap.servers","ip:9092");//集群地址 // ps.setProperty("group.id", "hotwords"); ps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//key序列化方式 ps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//value序列化方式 KafkaProducer kafkaProducer = new KafkaProducer (ps); ProducerRecord record = new ProducerRecord ("hotwords", segment.toString()); kafkaProducer.send(record); kafkaProducer.close(); try { Thread.sleep(1000*20); }catch (Exception e){ e.printStackTrace(); } }
参照了下面文章做的实验
本地webui 需要引入依赖,加上本地启动的配置即可访问页面https://blog.csdn.net/m0_49834705/article/details/115023005
class App {
public static void main(String[] args) throws Exception {
//本地啟動,webui
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
config.setInteger(RestOptions.PORT, 2841);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8, config);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","ip:9092");
properties.setProperty("group.id", "hotwords");
// properties.setProperty("auto.offset.reset", "latest");
// properties.setProperty("enable.auto.commit", "true")
// properties.setProperty("auto.commit.interval.ms", "15000")
//消費主題 hotword;自動提交offset
DataStreamSource text = env.addSource(
new FlinkKafkaConsumer("hotwords",
new SimpleStringSchema(),properties)
.setStartFromLatest()
// .setStartFromEarliest()
//自動提交
.setCommitOffsetsOnCheckpoints(true));
DataStream> ds = text.flatMap(new LineSplitter());
DataStream> wcount = ds
.keyBy(0) //按照Tuple2的第一个元素为key,也就是单词
.window(SlidingProcessingTimeWindows.of(Time.seconds(60),Time.seconds(20)))
//key之后的元素进入一个总时间长度为600s,每20s向后滑动一次的滑动窗口
.sum(1);// 将相同的key的元素第二个count值相加
// wcount.print("wc: ");
//所有key元素进入一个20s长的窗口(选20秒是因为上游窗口每20s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
DataStream> ret =
wcount.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20))).
process(new TopNAllFunction(20));//计算该窗口TopN
//保存到mysql
ret.addSink(new MySQLSink());
//打印
// ret.print("ret: ");
// wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分组
// .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口统计上游数据
// .process(new TopNFunction(5)); //分组TopN统计
env.execute("Window WordCount");
}
private static final class LineSplitter implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) {
// normalize and split the line
String[] tokens = value.split(",");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}
private static class TupleKeySelectorByStart implements KeySelector, String> {
@Override
public String getKey(Tuple2 value) throws Exception {
// TODO Auto-generated method stub
return value.f0.substring(0, 1); //取首字母做key
}
}
}
public class TopNAllFunction extends ProcessAllWindowFunction, Tuple2 , TimeWindow> { private int topSize = 10; public TopNAllFunction(int topSize) { // TODO Auto-generated constructor stub this.topSize = topSize; } @Override public void process(Context context, Iterable > input, Collector > out) throws Exception { TreeMap > treemap = new TreeMap >( new Comparator () { @Override public int compare(Integer y, Integer x) { return (x < y) ? -1 : 1; } }); //treemap按照key降序排列,相同count值不覆盖 for (Tuple2 element : input) { treemap.put(element.f1, element); if (treemap.size() > topSize) { //只保留前面TopN个元素 treemap.pollLastEntry(); } } for (Map.Entry > entry : treemap.entrySet()) { out.collect(entry.getValue()); } } }
public class MySQLSink extends RichSinkFunction> { Connection connection = null; PreparedStatement insertSmt = null; @Override public void open(Configuration parameters) throws Exception { String url = "jdbc:mysql://ip:3306/test-flink?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false"; connection = (Connection) DriverManager.getConnection(url,"root","Founder123"); insertSmt = (PreparedStatement) connection.prepareStatement("insert into tmp(word,num,cdate) values (?,?,?)"); } @Override public void invoke(Tuple2 value, Context context) throws Exception { //直接执行更新语句 // System.out.println("value.toString()-------" + value.toString()); insertSmt.setString(1,value.f0); insertSmt.setInt(2,value.f1); insertSmt.setString(3, LocalDateTime.now().toString()); insertSmt.execute(); } @Override public void close() throws Exception { if (insertSmt != null){ insertSmt.close(); } if (connection != null){ connection.close(); } } //jdbc // JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() // .setDrivername("com.mysql.cj.jdbc.Driver") // .setDBUrl("jdbc:mysql://localhost:3306/test?user=root&password=123456") // .setQuery("insert into words (word,count) values (?,?) ") // //设置为每2条数据就提交一次 // .setBatchInterval(2) // .finish(); }
CREATE TABLE `tmp` ( `id` int(11) NOT NULL AUTO_INCREMENT, `word` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL, `num` int(11) DEFAULT NULL, `cdate` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2416 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;效果图 pom文件
UTF-8 UTF-8 1.8 2.11 23.0 1.13.2 2.10.4 1.3.3 org.springframework.boot spring-boot-starter-web com.alibaba fastjson 1.2.4 org.apache.commons commons-lang3 3.6 org.apache.flink flink-connector-kafka_${scala.version} ${flink.version} org.apache.flink flink-streaming-java_${scala.version} ${flink.version} org.apache.flink flink-clients_${scala.version} ${flink.version} org.apache.flink flink-core ${flink.version} org.springframework.boot spring-boot-starter-test commons-io commons-io 2.5 com.hankcs hanlp portable-1.8.3



