栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink流处理——Transform分组聚合

Flink流处理——Transform分组聚合

1:map,flatMap,Filter
package com.atguigu.transfrom;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class Test1 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\hello.txt";
        DataStreamSource dataStream = env.readTextFile(path);


        //map,返回每行字符的长度
        SingleOutputStreamOperator map = dataStream.map(new MapFunction() {
            @Override
            public Integer map(String value) throws Exception {
                return value.length();
            }
        });

        //flatMap,获取是以h开头的单词
        SingleOutputStreamOperator flatMap = dataStream.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] split = value.split(" ");
                for (String s : split) {
                    if (s.startsWith("h")) {
                        out.collect(s);
                    }
                }
            }
        });

        //filter,过滤掉不是h开头的单词
        SingleOutputStreamOperator filter = dataStream.filter(new FilterFunction() {
            @Override
            public boolean filter(String value) throws Exception {

                    if (value.startsWith("h")) {
                        return true;
                    } else {
                        return false;
                    }
            }
        });

        flatMap.print();
        env.execute();


    }
}
2:KeyBy DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。 3:滚动聚合 这些算子可以针对 KeyedStream 的每一个支流做聚合。 ⚫ sum() ⚫ min() ⚫ max() ⚫ minBy() ⚫ maxBy()
package com.atguigu.transfrom;

import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Test2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
        DataStreamSource dataStream = env.readTextFile(path);

        DataStream map = dataStream.map( value -> {
            String[] split = value.split(",");
            return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
        });

        //分组
        KeyedStream keyedStream = map.keyBy("id");
//        KeyedStream keyedStream1 = map.keyBy(SensorReading::getId);

        //滚动聚合
        //max仅更新temperature数据,其他数据和以前一样
        SingleOutputStreamOperator temperature = keyedStream.max("temperature");
        //maxBy获取最大temperature数据,其他字段是最大temperature对应的数据
        SingleOutputStreamOperator maxBy = keyedStream.maxBy("temperature");


        maxBy.print();
        env.execute();
    }
}

4:Reduce KeyedStream → DataStream :一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果
package com.atguigu.transfrom;

import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Test3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String path ="D:\大数据组件API\Flink\Flink01\src\main\resources\test.txt";
        DataStreamSource dataStream = env.readTextFile(path);

        DataStream map = dataStream.map( value -> {
            String[] split = value.split(",");
            return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
        });

        //分组
        KeyedStream keyedStream = map.keyBy("id");

        SingleOutputStreamOperator reduce = keyedStream.reduce(new ReduceFunction() {
            @Override
            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
                return new SensorReading(value1.getId(),value2.getTimestamp(),
                        Math.max(value1.getTemperature(),value2.getTemperature()));
            }
        });


        reduce.print();
        env.execute();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/460375.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号