栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink+kafka 实现wordcount

flink+kafka 实现wordcount

以下内容基于flink1.12

pom依赖

	
        UTF-8
        UTF-8
        1.8
        1.8
        1.8
        2.12
        1.12.2
    
    
        
            org.apache.flink
            flink-clients_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_2.12
            ${flink.version}
        

        
            org.apache.flink
            flink-connector-kafka_2.11
            1.12.1
        

        
            com.google.code.gson
            gson
            2.8.5
        
        
            org.slf4j
            slf4j-api
            1.7.7
        
        
            org.projectlombok
            lombok
            1.18.8
        
        
        
            mysql
            mysql-connector-java
            5.1.49
        
        
            com.ververica
            flink-connector-mysql-cdc
            2.0.0
        
    

代码:

@Slf4j
public class ProcessFunction {

    public static void main(String[] args) throws Exception {

        // 声明环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        final Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "source");
        FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("source_topic", new SimpleStringSchema(), properties);

        // source
        SingleOutputStreamOperator source = env.addSource(flinkKafkaConsumer)
                .name("source")
                .uid("source");
        // 设置watermark
        

        // transform
        // 1.不使用tuple结构 直接使用自定义实体类
        SingleOutputStreamOperator reduce = process(source);
        // 2.使用tuple结构直接计算
        SingleOutputStreamOperator> tuple = processOfTuple(source);

        tuple.print();


        // sink
        FlinkKafkaProducer stringFlinkKafkaProducer = new FlinkKafkaProducer<>(
                "sink_topic",
                new KafkaSerializationSchemaWrapper<>(
                        "sink_topic", null, false, new SimpleStringSchema()),
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        
        reduce.map(
                (MapFunction) count -> {
                    final Gson gson = new GsonBuilder().create();
                    return gson.toJson(count);
                }).addSink(stringFlinkKafkaProducer);

        tuple.map((MapFunction, String>) Tuple2::toString)
                .returns(TypeInformation.of(String.class)).addSink(stringFlinkKafkaProducer);

        // 提交执行
        env.execute("flink-test");
    }

    
    public static SingleOutputStreamOperator> processOfTuple(SingleOutputStreamOperator source){
        SingleOutputStreamOperator> sum = source.flatMap((String line, Collector> collector) -> {
            String[] tokens = line.split(",",-1);
            // 输出结果 (word, 1)
            for (String token : tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<>(token, 1));
                }
            }
        })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);
        return sum;
    }
    
    public static SingleOutputStreamOperator process(SingleOutputStreamOperator source) {
        SingleOutputStreamOperator objectSingleOutputStreamOperator = source.flatMap((FlatMapFunction) (s, collector) -> {
            String[] split = s.split(",", -1);
            for (String s1 : split) {
                collector.collect(new Count(s1, 1));
            }
        }).returns(TypeInformation.of(Count.class));

        SingleOutputStreamOperator reduce = objectSingleOutputStreamOperator.keyBy(Count::getWord).reduce(new ReduceFunction() {
            @Override
            public Count reduce(Count count, Count t1) throws Exception {
                return new Count(count.getWord(), count.getCount() + t1.getCount());
            }
        });
        return reduce;
    }


    public static class Count implements Serializable {
        String word;
        int count;

        public Count(String word, int count) {
            this.word = word;
            this.count = count;
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public int getCount() {
            return count;
        }

        public void setCount(int count) {
            this.count = count;
        }

        @Override
        public String toString() {
            return "{" +
                    "word='" + word + ''' +
                    ", count=" + count +
                    '}';
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            Count count1 = (Count) o;
            return count == count1.count && Objects.equals(word, count1.word);
        }

        @Override
        public int hashCode() {
            return Objects.hash(word, count);
        }
    }


    public static final DateTimeFormatter STANDARD_PATTERN_FORMATTER =
            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    
    public static long getWaterMark(String message) {
        long eventTime;
        try {
            final Gson gson = new GsonBuilder().create();
            final JsonObject json = gson.fromJson(message, JsonObject.class);
            final JsonElement eventTimeElement = json.get("eventTime");
            if (Objects.isNull(eventTimeElement) || eventTimeElement.isJsonNull()) {
                log.warn("input message {} do not contain `eventTime`," +
                        " return `System.currentTimeMillis()` as eventTime", message);
                eventTime = System.currentTimeMillis();
            } else {
                final String eventTimeStr = eventTimeElement.getAsString();
                eventTime = LocalDateTime.parse(eventTimeStr, STANDARD_PATTERN_FORMATTER)
                        .toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
            }
        } catch (Exception exp) {
            log.error("extract `eventTime` from message {} fails," +
                    " return `System.currentTimeMillis()` as eventTime", message, exp);
            eventTime = System.currentTimeMillis();
        }
        return eventTime;
    }
}

附上本地安装kafka步骤(自行测试)
https://blog.csdn.net/sanhongbo/article/details/123574606

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775909.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号