栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink Window那些事——AggregateFunction窗口函数

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink Window那些事——AggregateFunction窗口函数

原文链接:Flink Window那些事——AggregateFunction窗口函数 - Ruthless - 博客园

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。

输入类型是输入流中的元素类型,AggregateFunction有一个add方
法可以将一个输入元素添加到一个累加器中。该接口还具有创建初始累加器(createAccumulator方法)、将两个累加器合并到一个累加器(merge方法)以及从累加器中提取输出(类型为OUT)的方法。

package com.lynch.stream.window;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TestAggFunctionOnWindow {
    public static void main(String[] args) throws Exception {
        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取数据
        DataStream> input = env.fromElements(ENGLISH);

        // 求各个班级英语成绩平均分
        DataStream avgScore = input.keyBy(0).countWindow(3).aggregate(new AverageAggrate());

        avgScore.print();

        env.execute("TestAggFunctionOnWindow");

    }

    public static final Tuple3[] ENGLISH = new Tuple3[] { 
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L), 
            Tuple3.of("class1", "王五", 60L), 
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L), 
            Tuple3.of("class2", "小八", 50L), 
    };

    //Tuple3 输入类型
    //Tuple2 累加器ACC类型,保存中间状态
    //Double 输出类型
    public static class AverageAggrate implements AggregateFunction, Tuple2, Double> {
        
        @Override
        public Tuple2 createAccumulator() {
            return new Tuple2<>(0L, 0L);
        }

        
        @Override
        public Tuple2 add(Tuple3 value, Tuple2 acc) {
            //acc.f0 总成绩 
            //value.f2 表示成绩
            //acc.f1 人数
            return new Tuple2<>(acc.f0 + value.f2, acc.f1 + 1L);
        }

        
        @Override
        public Double getResult(Tuple2 acc) {
            return ((double) acc.f0) / acc.f1;
        }

        
        @Override
        public Tuple2 merge(Tuple2 acc1, Tuple2 acc2) {
            return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
        }
    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710131.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号