栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

第四章 Flink基础练习之Transformation算子

第四章 Flink基础练习之Transformation算子

1、Map & FlatMap 1.1、Map算子
  • 作用:一对一转换对象
  • 实战案例:将文本转换成长度输出
package com.lihaiwei.text1.app;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class flink04kafka {
    public static void main(String[] args) throws Exception {
        //1、构建执⾏任务环境以及任务的启动的⼊⼝, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、配置kafka连接参数
        Properties props = new Properties();
        //kafka地址
        props.setProperty("bootstrap.servers", "hadoop102:9092");
        //消费者组名
        props.setProperty("group.id", "console-consumer-8000");
        //消费消息时字符串序列化和反序列化规则
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //消费消息时offset重置规则
        props.setProperty("auto.offset.reset", "latest");
        //⾃动提交
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //有后台线程每隔10s检测⼀下Kafka的分区变化情况
        props.setProperty("flink.partition-discovery.interval-millis", "10000");

        // 3、定义kafka的消费者的
        FlinkKafkaConsumer consumer =new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), props);

        // 4、设置从记录的消费者组内的offset开始消费
        consumer.setStartFromGroupOffsets();

        // 5、设置kafka作为source
        DataStream ds = env.addSource(consumer);

        // 6、打印输出
        ds.print("打印后");

        // 7、处理,拼接字符串
        DataStream mapDS = ds.map(new MapFunction() {
            @Override
            public String map(String value) throws Exception {
                return "map"+value;
            }
        });

        // 8、输出到kafka中
        FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>("second", new SimpleStringSchema(), props);
        mapDS.addSink(kafkaSink);

        // 9、DataStream需要调⽤execute,可以取个名称
        env.execute("custom source job");
    }
}
  • 运行结果

Ⅰ、kafkad的frist生产者

Ⅱ、IDEA运行打印

Ⅲ、kafka的second消费者

1.2、FlatMap算子
  • 作用:一对多转换对象
  • 实战案例:字符串切割输出
package com.lihaiwei.text1.app;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class flink04kafka {
    public static void main(String[] args) throws Exception {
        //1、构建执⾏任务环境以及任务的启动的⼊⼝, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、配置kafka连接参数
        Properties props = new Properties();
        //kafka地址
        props.setProperty("bootstrap.servers", "hadoop102:9092");
        //消费者组名
        props.setProperty("group.id", "console-consumer-8000");
        //消费消息时字符串序列化和反序列化规则
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //消费消息时offset重置规则
        props.setProperty("auto.offset.reset", "latest");
        //⾃动提交
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //有后台线程每隔10s检测⼀下Kafka的分区变化情况
        props.setProperty("flink.partition-discovery.interval-millis", "10000");

        // 3、定义kafka的消费者的
        FlinkKafkaConsumer consumer =new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), props);

        // 4、设置从记录的消费者组内的offset开始消费
        consumer.setStartFromGroupOffsets();

        // 5、设置kafka作为source
        DataStream ds = env.addSource(consumer);

        // 6、打印输出
        ds.print("处理前");

        // 7、处理,切割字符串
        DataStream mapDS = ds.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String [] arr = value.split(",");
                for(String str:arr){
                    out.collect(str);
                }
            }
        });
        // 7、打印输出
        mapDS.print("处理后");

        // 8、输出到kafka中
        FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>("second", new SimpleStringSchema(), props);
        mapDS.addSink(kafkaSink);

        // 9、DataStream需要调⽤execute,可以取个名称
        env.execute("custom source job");
    }
}
  • 运行结果

Ⅰ、first生产者

Ⅱ、IDEA运行结果

Ⅲ、second消费者

2、RichMap & RichFlatMap
  • Rich相关的API更丰富,多了Open、close方法,用于初始化连接

  • RichXXX相关Open、Close、setRuntimeContext等 API⽅法会根据并⾏度进⾏操作的

    • ⽐如并⾏度是4,那就有4次触发对应的open/close⽅法等, 是4个不同subtask
    • ⽐如 RichMapFunction、RichFlatMapFunction、 RichSourceFunction等
2.1、RichMap算子
package com.lihaiwei.text1.app;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.stream.Stream;

public class flink02 {
    public static void main(String[] args) throws Exception {
        // 1、构建执行任务以及任务的启动入口,存储全局相关参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 2、相同类型元素的数据流
        //DataSet stringDS = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka");
        DataStream stringDS = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka");
        //DataStream stringDS2 = env.fromSequence(1,10);
        stringDS.print("执行前");
        // 3、进行flatmap操作
        DataStream flatmapStream = stringDS.map(new RichMapFunction() {
            @Override
            public void open(Configuration parameters) throws Exception {
                System.out.println("====open====");
            }

            @Override
            public void close() throws Exception {
                System.out.println("====close====");
            }

            @Override
            public String map(String s) throws Exception {
                return "test"+s;
            }
        });
        // 4、输出
        flatmapStream.print("执行后");
        // 5、/DataStream需要调⽤execute,可以取个名称
        env.execute("flink");
    }
}
  • 运行结果: 两个并行度

2.2、RichFlatMap算子
package com.lihaiwei.text1.app;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.stream.Stream;

public class flink02 {
    public static void main(String[] args) throws Exception {
        // 1、构建执行任务以及任务的启动入口,存储全局相关参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 2、相同类型元素的数据流
        //DataSet stringDS = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka");
        DataStream stringDS = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka");
        //DataStream stringDS2 = env.fromSequence(1,10);
        stringDS.print("执行前");
        // 3、进行flatmap操作
        DataStream flatmapStream = stringDS.flatMap(new RichFlatMapFunction() {
            @Override
            public void open(Configuration parameters) throws Exception {
                System.out.println("====open====");
            }

            @Override
            public void close() throws Exception {
                System.out.println("=====close====");
            }

            @Override
            public void flatMap(String value, Collector collector) throws Exception {
                // 3.1、进行切割转换
                String [] arr = value.split(",");
                // 3.2、遍历收集每一个
                for(String str : arr){
                    collector.collect(str);
                }
            }
        });
        // 4、输出
        flatmapStream.print("执行后");
        // 5、/DataStream需要调⽤execute,可以取个名称
        env.execute("flink");
    }
}
  • 运行结果 : 有两个subtask

3、KeyBy算子
  • keyby:分组 & 统计
 keyBy:把数据流按照某个字段分区 
 keyBy后:是相同的数据放到同个组⾥⾯,再进⾏组内统计
  • 需求:根据id分组求最大温度值
id1,20210101,27.1
id2,20210101,26.1
id1,20210101,25.2
id3,20210101,25.1
id2,20210101,29.1
id1,20210101,33.1
  • 代码实战

①SensorReading类编写

package com.lihaiwei.text1.model;

public class SensorReading {
    // 1、创建私有属性:id,时间戳,温度值
    private String id;
    private Long timestamp;
    private Double temperatue;

    // 2、创建空参构造方法
    public SensorReading() {
    }
    // 3、创建全参构造方法

    public SensorReading(String id, Long timestamp, Double temperatue) {
        this.id = id;
        this.timestamp = timestamp;
        this.temperatue = temperatue;
    }
    // 4、创建get/set方法

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public Double getTemperatue() {
        return temperatue;
    }

    public void setTemperatue(Double temperatue) {
        this.temperatue = temperatue;
    }
    // 5、重写tostring方法,方便打印输出&序列化
    @Override
    public String toString() {
        return "SensorReading{" +
                "id='" + id + ''' +
                ", timestamp=" + timestamp +
                ", temperatue=" + temperatue +
                '}';
    }
}

②main编写

package com.lihaiwei.text1.app;

import com.lihaiwei.text1.model.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class flink05keyby {
    public static void main(String[] args) throws Exception {
        // 1、创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、读取数据文件
        DataStream inputDataStream = env.readTextFile("C:\Users\PROTH\Desktop\shenme\test5.txt");
        // 3.1、转换成SensorReading类型
        DataStream mapStream = inputDataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fileds = value.split(",");
                return new SensorReading(fileds[0],new Long(fileds[1]),new Double(fileds[2]));
            }
        }).keyBy("id").max("temperatue");

        mapStream.print("聚合后");
        // 9、DataStream需要调⽤execute,可以取个名称
        env.execute("keyby job");
    }
}
  • 运行结果

4、fiter & sum 4.1、fiter算子
  • 应用场景:筛选条件

  • 需求:筛选出h开头的数据

  • 代码实战

package com.lihaiwei.text1.app;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class flink04kafka {
    public static void main(String[] args) throws Exception {
        //1、构建执⾏任务环境以及任务的启动的⼊⼝, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、配置kafka连接参数
        Properties props = new Properties();
        //kafka地址
        props.setProperty("bootstrap.servers", "hadoop102:9092");
        //消费者组名
        props.setProperty("group.id", "console-consumer-8000");
        //消费消息时字符串序列化和反序列化规则
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //消费消息时offset重置规则
        props.setProperty("auto.offset.reset", "latest");
        //⾃动提交
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //有后台线程每隔10s检测⼀下Kafka的分区变化情况
        props.setProperty("flink.partition-discovery.interval-millis", "10000");

        // 3、定义kafka的消费者的
        FlinkKafkaConsumer consumer =new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), props);

        // 4、设置从记录的消费者组内的offset开始消费
        consumer.setStartFromGroupOffsets();

        // 5、设置kafka作为source
        DataStream ds = env.addSource(consumer);

        // 6、打印输出
        ds.print("处理前");

        // 7、处理,切割字符串
        DataStream mapDS = ds.filter(new FilterFunction() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("h");
            }
        });
        // 7、打印输出
        mapDS.print("处理后");

        // 8、输出到kafka中
        FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>("second", new SimpleStringSchema(), props);
        mapDS.addSink(kafkaSink);

        // 9、DataStream需要调⽤execute,可以取个名称
        env.execute("custom source job");
    }
}
  • 运行结果

①frist生产者

②IDEA运行结果

③second消费者

  • 注意:先过滤后分组聚合,较少内存消耗&数据分发
4.2、sum
  • 普通聚合算子:sum(),max(),min(),avg()

  • 代码实战 - 链式调用

package com.lihaiwei.text1.app;

import com.lihaiwei.text1.model.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class flink05keyby {
    public static void main(String[] args) throws Exception {
        // 1、创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、读取数据文件
        DataStream inputDataStream = env.readTextFile("C:\Users\PROTH\Desktop\shenme\test5.txt");
        // 3.1、转换成SensorReading类型
        DataStream mapStream = inputDataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fileds = value.split(",");
                return new SensorReading(fileds[0],new Long(fileds[1]),new Double(fileds[2]));
            }
        }).keyBy("id").sum("temperatue");

        mapStream.print("聚合后");
        // 9、DataStream需要调⽤execute,可以取个名称
        env.execute("keyby job");
    }
}
  • 运行结果

5、reduce算子(核心)
  • 应用场景:分组后聚合统计sum和reduce实现⼀样的效果
value1:是历史值
value2:是新值,不断累加统计的对象
  • 代码实战:
package com.lihaiwei.text1.app;

import com.lihaiwei.text1.model.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class flink05keyby {
    public static void main(String[] args) throws Exception {
        // 1、创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、读取数据文件
        DataStream inputDataStream = env.readTextFile("C:\Users\PROTH\Desktop\shenme\test5.txt");
        // 3.1、转换成SensorReading类型
        DataStream mapStream = inputDataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fileds = value.split(",");
                return new SensorReading(fileds[0],new Long(fileds[1]),new Double(fileds[2]));
            }
        });

        // 3.2、进行分组
        KeyedStream keybyDS = mapStream.keyBy(new KeySelector() {
            @Override
            public String getKey(SensorReading value) throws Exception {
                return value.getId();
            }
        });

        // 3.2、分组后自定义聚合规则
        SingleOutputStreamOperator reduceDS = keybyDS.reduce(new ReduceFunction() {
            @Override
            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
                SensorReading sensorReading = new SensorReading();
                // 获取键 - id
                sensorReading.setId(value1.getId());
                // 获取值 - 温度
                sensorReading.setTemperatue(value1.getTemperatue() + value2.getTemperatue());
                return sensorReading;
            }
        });


        reduceDS.print("聚合后");
        // 9、DataStream需要调⽤execute,可以取个名称
        env.execute("keyby job");
    }
}
  • 运行结果

  • reduce和sum区别
待续
6、max & maxBy | min & minBy
  • 区别:如果是⽤了keyby,在后续算⼦要⽤maxby,minby类型,才可以 再分组⾥⾯找对应的数据
1、如果是keyBy的是对象的某个属性,则分组⽤max/min聚合统计,只有聚合的字段会更新,其他字段还是旧的,导致对象不准,
2、⽤maxby/minBy才对让整个对象的属性都是最新的
  • 代码实战
package com.lihaiwei.text1.app;

import com.lihaiwei.text1.model.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class flink05keyby {
    public static void main(String[] args) throws Exception {
        // 1、创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、读取数据文件
        DataStream inputDataStream = env.readTextFile("C:\Users\PROTH\Desktop\shenme\test5.txt");
        // 3.1、转换成SensorReading类型
        DataStream mapStream = inputDataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fileds = value.split(",");
                return new SensorReading(fileds[0],new Long(fileds[1]),new Double(fileds[2]));
            }
        }).keyBy("id").max("temperatue");

        DataStream mapStream2 = inputDataStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fileds = value.split(",");
                return new SensorReading(fileds[0],new Long(fileds[1]),new Double(fileds[2]));
            }
        }).keyBy("id").maxBy("temperatue");



        mapStream.print("max");
        mapStream2.print("maxby");
        // 9、DataStream需要调⽤execute,可以取个名称
        env.execute("keyby job");
    }
}
  • 运行结果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/584399.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号