栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

通过hanlp分词写入kafka在flink计算词频统计实时热词topN写入mysql数据库

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

通过hanlp分词写入kafka在flink计算词频统计实时热词topN写入mysql数据库

通过hanlp分词写入kafka在flink计算词频统计热词topN写入mysql数据库
  • hanlp处理数据流入kafka
      • hanlp使用可以参考下面官网
    • 本文样例
    • 读文本文件,通过hanlp分词
      • 进行了文版表情剔除,url剔除 正则处理,带语性分析需要下载hanlp的data放到resource下,使用标准的不用
    • 写 入kafka
      • kafka中数据参考:
  • flink处理kafka数据
      • 本地webui 需要引入依赖,加上本地启动的配置即可访问页面
      • 效果图
      • pom文件

hanlp处理数据流入kafka hanlp使用可以参考下面官网

https://www.hanlp.com/

本文样例
{"text":"这是个文本样例啊"}
{"text":"这是个文本样例啊"}
读文本文件,通过hanlp分词 进行了文版表情剔除,url剔除 正则处理,带语性分析需要下载hanlp的data放到resource下,使用标准的不用
public void run() {
 		//停用词使用
        CoreStopWordDictionary.load(HanLP.Config.CoreStopWordDictionaryPath, true);
        // 分词结果不显示词性
        HanLP.Config.ShowTermNature = false;
        HanLP.Config.Normalization = true;
        StringBuilder sb = new StringBuilder();
        try{
            File file = new File(filePath);
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
            String str = null;
            while ((str = bufferedReader.readLine()) != null) {
                String inputStr = JSONObject.parseObject(str).getString("text");
                //去除url  表情
                String urlPattern = "((https?|ftp|gopher|telnet|file|Unsure|http):((//)|(\\))+[\w\d:#@%/;$()~_?\+-=\\\.&]*)[\ud83c\udc00-\ud83c\udfff]|[\ud83d\udc00-\ud83d\udfff]|[\u2600-\u27ff]|[\ud83e\udd00-\ud83e\uddff]|[\u2300-\u23ff]|[\u2500-\u25ff]|[\u2100-\u21ff]|[\u0000-\u00ff]|[\u2b00-\u2bff]|[\u2d06]|[\u3030]|\pP|\pS";
                Pattern p = Pattern.compile(urlPattern, Pattern.CASE_INSENSITIVE);
                Matcher m = p.matcher(inputStr);
                inputStr = m.replaceAll("");
                sb.append(inputStr);
            }
            //停用词使用
            List segment = NLPTokenizer.segment(sb.toString());
            CoreStopWordDictionary.apply(segment);
            //写kafka
            writeToKf(segment);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
写 入kafka
    public void writeToKf(List segment) {
        //寫入kafka
        Properties ps = new Properties();
        ps.setProperty("bootstrap.servers","ip:9092");//集群地址
//        ps.setProperty("group.id", "hotwords");
        ps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//key序列化方式
        ps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//value序列化方式
        KafkaProducer kafkaProducer = new KafkaProducer(ps);
        ProducerRecord record = new ProducerRecord("hotwords", segment.toString());
        kafkaProducer.send(record);
        kafkaProducer.close();
        try {
            Thread.sleep(1000*20);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
kafka中数据参考:

flink处理kafka数据

参照了下面文章做的实验

https://blog.csdn.net/m0_49834705/article/details/115023005

本地webui 需要引入依赖,加上本地启动的配置即可访问页面

class App {
        public static void main(String[] args) throws Exception {
            //本地啟動,webui
            Configuration config = new Configuration();
            config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
            config.setInteger(RestOptions.PORT, 2841);
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8, config);

//            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
            Properties properties=new Properties();
            properties.setProperty("bootstrap.servers","ip:9092");
            properties.setProperty("group.id", "hotwords");
//          properties.setProperty("auto.offset.reset", "latest");
//          properties.setProperty("enable.auto.commit", "true")
//          properties.setProperty("auto.commit.interval.ms", "15000")
            //消費主題 hotword;自動提交offset
            DataStreamSource text = env.addSource(
                    new FlinkKafkaConsumer("hotwords",
                            new SimpleStringSchema(),properties)
                            .setStartFromLatest()
//                            .setStartFromEarliest()
                            //自動提交
                            .setCommitOffsetsOnCheckpoints(true));

            DataStream> ds = text.flatMap(new LineSplitter());

            DataStream> wcount = ds
                    .keyBy(0) //按照Tuple2的第一个元素为key,也就是单词
                    .window(SlidingProcessingTimeWindows.of(Time.seconds(60),Time.seconds(20)))
                    //key之后的元素进入一个总时间长度为600s,每20s向后滑动一次的滑动窗口
                    .sum(1);// 将相同的key的元素第二个count值相加

//            wcount.print("wc: ");

            //所有key元素进入一个20s长的窗口(选20秒是因为上游窗口每20s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
            DataStream> ret =
                    wcount.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20))).
                            process(new TopNAllFunction(20));//计算该窗口TopN

            //保存到mysql
            ret.addSink(new MySQLSink());
            //打印
//            ret.print("ret: ");

//            wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分组
//                    .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口统计上游数据
//                    .process(new TopNFunction(5)); //分组TopN统计

            env.execute("Window WordCount");
        }

    
    private static final class LineSplitter implements   FlatMapFunction> {
        @Override
        public void flatMap(String value, Collector> out) {
            // normalize and split the line
            String[] tokens = value.split(",");
            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2(token, 1));
                }
            }
        }
    }

    
    private static class TupleKeySelectorByStart implements KeySelector, String> {
        @Override
        public String getKey(Tuple2 value) throws Exception {
            // TODO Auto-generated method stub
            return value.f0.substring(0, 1); //取首字母做key
        }
    }
}
public class TopNAllFunction  extends  ProcessAllWindowFunction, Tuple2, TimeWindow> {
    private int topSize = 10;

    public TopNAllFunction(int topSize) {
        // TODO Auto-generated constructor stub

        this.topSize = topSize;
    }


    @Override
    public void process(Context context, Iterable> input, Collector> out) throws Exception {
        TreeMap> treemap = new TreeMap>(
                new Comparator() {
                    @Override
                    public int compare(Integer y, Integer x) {
                        return (x < y) ? -1 : 1;
                    }
                }); //treemap按照key降序排列,相同count值不覆盖
        for (Tuple2 element : input) {
            treemap.put(element.f1, element);
            if (treemap.size() > topSize) { //只保留前面TopN个元素
                treemap.pollLastEntry();
            }
        }

        for (Map.Entry> entry : treemap.entrySet()) {
            out.collect(entry.getValue());
        }
    }
}
public class MySQLSink extends RichSinkFunction>  {
    Connection connection = null;
    PreparedStatement insertSmt = null;


    @Override
    public void open(Configuration parameters) throws Exception {
        String url = "jdbc:mysql://ip:3306/test-flink?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false";
        connection = (Connection) DriverManager.getConnection(url,"root","Founder123");
        insertSmt = (PreparedStatement) connection.prepareStatement("insert into tmp(word,num,cdate) values (?,?,?)");
    }

    @Override
    public void invoke(Tuple2 value, Context context) throws Exception {
        //直接执行更新语句
//        System.out.println("value.toString()-------" + value.toString());
        insertSmt.setString(1,value.f0);
        insertSmt.setInt(2,value.f1);
        insertSmt.setString(3, LocalDateTime.now().toString());
        insertSmt.execute();
    }

    @Override
    public void close() throws Exception {
        if (insertSmt != null){
            insertSmt.close();
        }
        if (connection != null){
            connection.close();
        }
    }

    //jdbc
//            JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
//                    .setDrivername("com.mysql.cj.jdbc.Driver")
//                    .setDBUrl("jdbc:mysql://localhost:3306/test?user=root&password=123456")
//                    .setQuery("insert into  words (word,count) values (?,?) ")
//                    //设置为每2条数据就提交一次
//                    .setBatchInterval(2)
//                    .finish();
}
CREATE TABLE `tmp` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `word` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
  `num` int(11) DEFAULT NULL,
  `cdate` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2416 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
效果图

pom文件

		UTF-8
		UTF-8
		1.8
        2.11
        23.0
        1.13.2
        2.10.4
        1.3.3
	

	
		
			org.springframework.boot
			spring-boot-starter-web
		




		
			com.alibaba
			fastjson
			1.2.4
		

        
            org.apache.commons
            commons-lang3
            3.6
        

        
            org.apache.flink
            flink-connector-kafka_${scala.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_${scala.version}
            ${flink.version}

        
        
            org.apache.flink
            flink-clients_${scala.version}
            ${flink.version}

        
        
            org.apache.flink
            flink-core
            ${flink.version}
        

		
			org.springframework.boot
			spring-boot-starter-test
		
		
		
			commons-io
			commons-io
			2.5
		
        
        
            com.hankcs
            hanlp
            portable-1.8.3
        

    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/880571.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号