栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink基础系列16-Tranform之聚合操作算子

Flink基础系列16-Tranform之聚合操作算子

文章目录
  • 一.聚合操作算子简介
    • 1.1 KeyBy
    • 1.2 Rolling Aggregation
    • 1.3 reduce
  • 二.代码实现
    • 2.1 maxby
    • 2.2 reduce
  • 参考:

一.聚合操作算子简介

DataStream里没有reduce和sum这类聚合操作的方法,因为Flink设计中,所有数据必须先分组才能做聚合操作。

先keyBy得到KeyedStream,然后调用其reduce、sum等聚合操作方法。(先分组后聚合)

常见的聚合操作算子主要有:

  1. keyBy
  2. 滚动聚合算子Rolling Aggregation
  3. reduce
1.1 KeyBy


DataStream -> KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

1、KeyBy会重新分区;
2、不同的key有可能分到一起,因为是通过hash原理实现的;

1.2 Rolling Aggregation

这些算子可以针对KeyedStream的每一个支流做聚合。
sum()
min()
max()
minBy()
maxBy()

1.3 reduce

Reduce适用于更加一般化的聚合操作场景。java中需要实现ReduceFunction函数式接口。

在前面Rolling Aggregation的前提下,对需求进行修改。获取同组历史温度最高的传感器信息,同时要求实时更新其时间戳信息。

二.代码实现

数据准备:
sensor.txt
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1

2.1 maxby

代码:

package org.example;



import org.flink.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import sun.awt.SunHints;

import javax.xml.crypto.Data;

public class TransformTest2_RollingAggregation {
    public static void main(String[] args) throws Exception {
        // 创建 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 执行环境并行度设置1
        env.setParallelism(1);

        DataStream dataStream = env.readTextFile("C:\Users\Administrator\IdeaProjects\FlinkStudy\src\main\resources\sensor.txt");

//        DataStream sensorStream = dataStream.map(new MapFunction() {
//            @Override
//            public SensorReading map(String value) throws Exception {
//                String[] fields = value.split(",");
//                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
//            }
//        });

        DataStream sensorStream = dataStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        // 先分组再聚合
        // 分组
        KeyedStream keyedStream = sensorStream.keyBy("id");
        //KeyedStream keyedStream = sensorStream.keyBy(SensorReading::getId);

        // 滚动聚合,max和maxBy区别在于,maxBy除了用于max比较的字段以外,其他字段也会更新成最新的,而max只有比较的字段更新,其他字段不变
        DataStream resultStream = keyedStream.maxBy("temperature");

        resultStream.print("result");

        env.execute();
    }
}

测试记录:
因为Flink是流式处理,来一条处理一条,而且我设的并行度是1,所以根据文件的顺序,读取到每一条,都会和上一条对比温度,然后输出对应的id、以及温度大的那个的timestamp及temperature。

如果我要输出最新的时间戳,该如何处理呢?
这个留到下一节reduce来处理。

2.2 reduce

代码:

package org.flink.transform;



import org.flink.beans.SensorReading;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransformTest3_Reduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件读取数据
        DataStream inputStream = env.readTextFile("C:\Users\Administrator\IdeaProjects\FlinkStudy\src\main\resources\sensor.txt");

        // 转换成SensorReading类型
        DataStream dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 分组
        KeyedStream keyedStream = dataStream.keyBy("id");

        // reduce聚合,取最大的温度值,以及当前最新的时间戳
        SingleOutputStreamOperator resultStream = keyedStream.reduce(new ReduceFunction() {
            @Override
            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
                return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
            }
        });

        keyedStream.reduce( (curState, newData) -> {
            return new SensorReading(curState.getId(), newData.getTimestamp(), Math.max(curState.getTemperature(), newData.getTemperature()));
        });

        resultStream.print();
        env.execute();
    }
}

测试记录:
如下可知,输出对应id,当前timestamp,以及当前最大的temperature

参考:
  1. https://www.bilibili.com/video/BV1qy4y1q728
  2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_521-%e4%bb%8e%e9%9b%86%e5%90%88%e8%af%bb%e5%8f%96%e6%95%b0%e6%8d%ae
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/342484.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号