栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink 经典WordCount入门案例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink 经典WordCount入门案例

  • 以批处理的方式从文本读取数据:
package com.hmi1024.flink.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        
        //初始化flink批处理的运行环境(获取到当前环境,如果本地运行获取local环境)
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //指定文件路径,获取文件数据
        final DataSource lines = env.readTextFile("./data/input/wordcount.txt");

        //对获取到的数据进行空格拆分
        //map与flatmap的区别
        //String:传入值类型
        //String:返回值类型
        
        final FlatMapOperator words = lines.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String line, Collector out) throws Exception {
                //将每行字符串进行空格拆分
                final String[] dataArray = line.split(" ");
                //循环遍历字符串数组
                for (String word : dataArray) {
                    //需要使用out进行返回数据
                    out.collect(word);
                }
            }
        });

        
        final MapOperator> wordAndOne = words.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });

        //对相同的单词进行分组操作
        final UnsortedGrouping> grouped = wordAndOne.groupBy(0);

        //对分组后的数据进行累加操作
        final AggregateOperator> summed = grouped.sum(1);

        //打印输出(测试)
        summed.print();

        //todo 8)启动作业,递交任务
        //在批处理开发中以下方法会触发作业的递交操作,故无需 env.execute()
        //'execute()', 'count()', 'collect()', or 'print()'.
        //env.execute();
    }
}

  • 以流处理的方式读取socket里面的数据
package com.hmi1024.flink.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        
        //初始化flink流处理的运行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
		//设置并行度
        env.setParallelism(2);

        //指定socket数据源,获取数据
        final DataStreamSource socketTextStream = env.socketTextStream("node1", 9999);

        //对获取到的数据进行空格拆分
        final SingleOutputStreamOperator words = socketTextStream.flatMap(
                new FlatMapFunction() {
            @Override
            public void flatMap(String line, Collector out) throws Exception {
                //将每行字符串进行空格拆分
                final String[] dataArray = line.split(" ");
                //循环遍历字符串数组
                for (String word : dataArray) {
                    //需要使用out进行返回数据
                    out.collect(word);
                }
            }
        });
        
        //对拆分的单词进行计数,每个单词记一次数
        final SingleOutputStreamOperator> wordAndOne = words.map(
                new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });

        //对相同的单词进行分组操作
        final KeyedStream, Tuple> keyedStream = wordAndOne.keyBy(0);

        //对分组后的数据进行累加操作
        final SingleOutputStreamOperator> summed = keyedStream.sum(1);

        //打印输出(测试)
        summed.print();

        //启动作业,递交任务
        env.execute();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/821654.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号