栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink读取kafka数据并进行单词计数

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink读取kafka数据并进行单词计数

Flink读取kafka数据并进行单词计数

前言:
这是一个使用Flink实时读取kafka数据,并将单词个数统计实时输出到控制台的demo.

依赖:

    
        
            org.apache.flink
            flink-java
            1.14.0
        

        
            org.apache.flink
            flink-connector-kafka_2.12
            1.14.0
        

        
            org.apache.flink
            flink-streaming-java_2.12
            1.14.0
            provided
        

        
            org.apache.flink
            flink-clients_2.12
            1.14.0
        

       
        
            org.apache.kafka
            kafka-clients
            2.8.0
        
    

导包:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.FlatMapIterator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;

java代码:

public class WordCounInTime {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置监控数据流时间间隔(官方叫状态与检查点)
        see.enableCheckpointing(5000);
        Properties properties = new Properties();
        //kafka默认端口号9092
        properties.setProperty("bootstrap.servers","localhost:9092");
        //zookeeper默认端口2181
        properties.setProperty("zookeeper.connect", "localhost:2181");
        //给消费者分组,同一组的消费者不会消费相同的消息
        properties.setProperty("group.id","test");
        //反序列化
//        properties.setProperty("key.deserializer","org.apache.kafka .common.serialization.StringDeserializer");
//        properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("kafka-test",new SimpleStringSchema(), properties);
        //指定偏移量,这里指定从最后一个之后再读取。
        consumer.setStartFromLatest();

        DataStreamSource data = see.addSource(consumer);

        SingleOutputStreamOperator> map = data.flatMap(new FlatMapIterator() {
            @Override
            public Iterator flatMap(String s) throws Exception {
                String replace = s.replace(",", "").replace(".", "").replace("?", "");
                return Arrays.asList(replace.split(" ")).iterator();
            }
        }).map(new MapFunction>() {
            @Override
            public Tuple2 map(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });

        map.keyBy(0).sum(1).print();
        see.execute();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/318610.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号