栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink DataStream之min()与minBy(),max()与maxBy()区别详解

Flink DataStream之min()与minBy(),max()与maxBy()区别详解

在Flink中有一类滚动聚合的算子(Rolling Aggregation):

sum()、min()、minBy()、max()、maxBy()

其中,对于min()和minBy(),max()和maxBy()之间的区别,具体如下:

1、处理的数据只有两个字段:

即:只有分组字段和比较字段,

如城市温度数据(city,temp),其中city用来分组(keyBy),temp用来比较(min/minBy),

那么,此时min()和minBy()的作用是一样的,都是得到比较字段的最小值。案例代码如下:读者也可以自己写个demo测试一下。

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("rest.port","8091");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        //从端口号获取数据
        DataStreamSource dataSource = env.socketTextStream("10.12.36.102", 8888);

        //=========================两个字段==============
        //将数据转化为Tuple2类型,(city,temp)
        SingleOutputStreamOperator> mapedStream = dataSource.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                String[] fields = value.split(" ");
                return Tuple2.of(fields[0], Float.valueOf(fields[1]));
            }
        });
        
        //按照city进行分组
        KeyedStream, String> keyedStream = mapedStream.keyBy(tp -> tp.f0);
        
        //按照temp进行比较
        //测试min算子
        SingleOutputStreamOperator> minStream = keyedStream.min(1);

        //测试minBy算子
        //SingleOutputStreamOperator> minByStream = keyedStream.minBy(1);
      
        minStream.print();
        env.execute();
    }
2、处理数据有多个字段:

即:除分组字段、比较字段外,还有一些其他的字段,如Tuple3及以上类型,或者POJO类型数据

2.1 对于min()算子

返回的是分组字段和比较字段的最小值,除分组字段和比较字段之外的字段,则返回的是第一次出现时的值,也就是说min()返回指定字段的最小值,但却不是该最小值所在的那一整条数据。

(干说文字可晦涩,我们举个例子说明一下)

以 (prov,city,temp)类型数据为例,其中prov进行分组,temp进行比较,temp不参与任何情况。

输入:(辽宁,大连,36.5)-->输出(辽宁,大连,36.5),因为此时只有这一条记录

输入:(辽宁,沈阳,32.4)-->输出(辽宁,大连,32.4),此时最小值为32.4,但是city字段显示的是第一次出现的值

输入:(辽宁,锦州,10.0)-->输出(辽宁,大连,10.0),此时最小值为10.0,city字段仍然显示的是第一个出现的值

2.2 对于minBy()算子

返回的就是最小值所在的最新的整条数据。

(辽宁,大连,36.5)-->输出(辽宁,大连,36.5)

(辽宁,沈阳,32.4)-->输出(辽宁,沈阳,32.4)

(辽宁,锦州,10.0)-->输出(辽宁,锦州,10.0)

★★★★★★对于minBy()算子,还有一个boolean参数★★★★★★

minBy(2),默认缺省值就为true,当比较字段最小值出现相等的情况时,其他字段返回第一次出现时的值,举例如下:

(辽宁,大连,32)-->输出(辽宁,大连,32)

(辽宁,沈阳,32)-->输出(辽宁,大连,32),city返回的是第一次出现的值

(辽宁,锦州,32)-->输出(辽宁,大连,32),city仍返回的是第一次出现的值

minBy(2,fasle),设置为false时,当比较字段最小值出现相等的情况时,则返回最新的最小值所在的整条数据

(辽宁,大连,32)-->输出(辽宁,大连,32)

(辽宁,沈阳,32)-->输出(辽宁,沈阳,32),返回最新的整条数据

(辽宁,锦州,32)-->输出(辽宁,锦州,32),返回最新的整条数据

处理多字段数据的案例代码如下,读者们可以自测一下:

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("rest.port","8091");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

        DataStreamSource dataSource = env.socketTextStream("10.12.36.102", 8888);

        //==============三个字段=====================
        //将数据转化为Tuple3类型
        SingleOutputStreamOperator> mapStream = dataSource.map(new MapFunction>() {
            @Override
            public Tuple3 map(String value) throws Exception {
                String[] fields = value.split(" ");
                return Tuple3.of(fields[0], fields[1],Float.valueOf(fields[2]));
            }
        });
        
        //按照prov分组
        KeyedStream, String> keyStream = mapStream.keyBy(tp->tp.f0);

        //按temp进行比较
        SingleOutputStreamOperator> minStream = keyStream.minBy(2,false);

        minStream.print();
        env.execute();
    }

好啦,这样举个例子应该就清晰多了,最后再总结一下:

1、处理数据只有两个字段:

        min()和minBy()作用一样,均为取最小值

2、处理数据有多个字段:

        ★min()返回比较字段的最小值,但不是最小值所在的整体数据

        ★minBy()返回比较字段所在的最新整体数据

                如果比较字段出现最小值相等的情况:

                > 使用minBy(...,true)时,(true为默认可省略),其他字段返回第一次出现时的值

                > 使用minBy(...,false)时,返回最新的最小值所在的整条数据


我是smallk,自学大数据,拿到百度、京东、小米、顺丰、58、哈罗、海康等22家大数据offer,欢迎仍在数据路上的小伙伴,我们一起讨论前行。


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/600480.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号