栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink输入输出&算子&FlinkCDC

Flink输入输出&算子&FlinkCDC

Flink

版本问题,flink使用Scala时,flink1.12兼容scala2.11,flink1.12不兼容scala2.12

一、简介

flink是对有界流和无界数据流进行有状态的计算(所谓状态,指的是之前的数据sparkStreaming中只有updatestateByKey有状态,但是flink中任何的算子都可以有状态,可以自己定义)

1.1sparkStreaming与flink的区别
(1)事件驱动型

sparkStreaming是时间驱动,一个批次中假如没有数据依然会从kafka拉取数据,走计算逻辑,只不过没有结果。
flink是事件驱动的(事件即指数据)来一条计算一条,不会浪费资源且实时性更好。(是和spark批处理的本质区别)

(2)流处理与批处理

spark处理数据都是按照批次处理的,如果每隔批次的时间较长,可认为是批处理,若每个批次相隔的时间很短,是微批处理来模拟的流处理。
flink的流与批是根据数据有无结束限定的。如果flink读取文本文件有开头有结尾是批处理(批处理时也是事件驱动型,来一条数据处理一条),如果从kafka读取数据,有开头没有结尾是流处理(流处理是事件驱动型)
flink使用时间窗口就能实现spark的微批处理。

(3)关于并行度的问题
spark在代码中可以更改并行度(即进行分区数的设置,但是只有一些特定的算子能进行指定,像map,flapmap是不能的)
flink中任何算子都能设置并行度(map.setParallelism(3))注意keyby没有,因为keyby是hash操作将数据送入后面的算子中。

 flink与spark任务划分的方式不同,spark根据产生shuffer操作会产生新的stage,而flink是只要算子之间的并行度不同就会产生新的stage
 
(4)

以上(3)结论的图[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KJpOUwix-1641372478560)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211210220337526.png)]

代码如下:读端口并行度为1,flatmap:3,keyby,sum:2,print:4是env设置的全局并行度

public class Flink03_WordCount_Unbounded {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        //读端口数据
        DataStreamSource sourceDS = env.socketTextStream("linux2", 9999);
        //读取的数据扁平化
        SingleOutputStreamOperator> tupleDS = sourceDS.flatMap(new Flink02_WordCount_Bounded.MyFlapMapFunc()).setParallelism(3);
        KeyedStream, String> keyDS = tupleDS.keyBy(new KeySelector, String>() {
            public String getKey(Tuple2 value) throws Exception {
                return value.f0;
            }
        });
        SingleOutputStreamOperator> result = keyDS.sum(1).setParallelism(2);
        result.print();
        //启动任务
        env.execute();

    }
}
1.2分层API

2020年12月8日发布的最新版本****1.12.0****, 已经完成实现了真正的****流批一体****

从简到易:
sql:SQL查询可以直接在Table API定义的表上执行。
table api:表可以动态变化,API提供可比较的操作,例如select、project、join、group-by、aggregate等,用户可自定义函数(UDF)进行扩展,Table API程序在执行之前会经过内置优化器进行优化。
datastream/dataSet API:DataStream/DataSet 与表可以无缝切换。
process function:最底层的API,算子无法实现的需要使用最底层process编写程序。
sql也能应用与流处理
1.3flink 流批处理不同
flink不向spark,没有reducrByKey,分组和聚合是分开的,先groupby,再聚合。?
flink流(StreamExecutionEnvironment)处理与批(ExecutionEnvironment)处理所需的执行环境不同。
flink批处理有groupby,流处理只有keyBy。
flink批处理也是来一条执行一条,只不过最后输出一次结果,flink流处理来一条执行一条输出一次结果。
1.4 flink 部署模式
(1)local模式

(2)standaloan模式:提交任务前要先启动flink集群。是flink自己提供计算资源,不依赖于其他框架,可在没有hadoop节点中单独安装flink集群
flink-conf.xml配置文件:
1、可配置taskmanager总共内存,单节点可用的插槽,默认并行度
2、jobmanager的高可用,包含zk的配置
3、状态后端:指的是存储状态的地址,通常有3种,jobmanager(是内存级别的),hdfs,rocksdb(是指一种kv的存储格式,且支持增量的操作,是最常用的)

(3)yarn模式:用yarn做资源的分配,基于hadoop集群之上。
flink基于yarn墨水有3种提交作业的方式
① session-cluster:启动flink集群后向yarn申请得到资源,这个资源是永远不变的,当提交多个任务时,会共用申请到的资源,如果前面的job用尽了申请到的slot且任务不结束,后面的job则申请不到资源无法执行。此种模式适合job比较短的需要频繁提交的批处理任务。
备注:session和per-job的区别:session是一次性申请flink集群资源,所有的任务用这一个资源,job运行完后下一个任务只需要去获得slot即可,而per-job需要针对每隔任务都去yarn申请资源,所以per-job模式提交会比session提交反应慢。(数据库连接池与普通获取数据库连接方式的区别吧)
② application mode:client
备注:application和per-job区别:用户自主程序运行的位置。application相当于spark的client,用户住程序运行在本地的jobmanager
③ per-job-cluster:cluster,用的最多的是per-job-cluster
1.5 flink运行架构
1、jobManager:作用
①. 响应客户端请求
②.向resureManager申请资源
③.将job生成作业图-数据流图-执行图(切分任务)
④.分发执行图到taskManager上,负责任务的协调

2、resourceManager:
①.此处是flink单独的resourceManger,不是yarn的
②.flink的resourceManager和jobManager运行在同一个节点上

3、dispatcher:分发器:
只有能在网页上提交任务的模式才有触发器

4、taskManager:tm上有slot插槽,slot是对tm的内存资源进行隔离,并不会隔离cpu

5、并行度
一个流程序的并行度是代码算子中最大的并行度,而每个算子可以有不同的并行度,每个算子的并行度就叫subtask
stream在算子之间传输数据的形式有两种:
① one-to-one:source--map(flatMap)算子之间没有重分区是one-to-one的对应关系,类似spark的窄依赖。
②Redistributing:flatMap-keyBy(or window)有重分区,每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy()基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,类似spark的宽依赖。

6、Operator Chains(任务链)
类似spark中的stage
但是flink的任务链划分和spark中stage划分是不一样的。
想要算子划分到同一个任务链中必须满足以下3个条件:
第一:算子没有产生重分区
第二:算子之间的并行度要相同
第三:算子属于同一个共享组
这样划分的目的是为了减少网络IO的。
7、共享组
//全局并行度为 1
// souorce --flatMap.slotSharingGroup("group1")--map --keyBy--sum.slotSharingGroup("group2")--print
map是在default还是在group1组?
任务链划分如下:
souorce / flatMap及map在group1组 /keyBy及sum及print在default组

思考:

slot的个数通常是job中最大的并行度,这里并行度是2,却使用了5个slot,原因是什么?

是不同算子设置了不同的共享组。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yqSJz6Bj-1641372478561)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211214213600616.png)]

例1:
//全局并行度为 1
// souorce --flatMap.slotSharingGroup("group1")--map.setParallelism(3) --keyBy--sum.slotSharingGroup("group2")--print

会切分4个任务链,使用5个slot(可认为共享组有就近原则?)
所以之前说的slot的数量是算子中的最大并行度是同一个共享组的时候,完善应是:
任务所需的slot数量是每个共享组中最大的并行度之和。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GiAr61RG-1641372478561)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211214223635769.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pjDK6lnH-1641372478562)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211214223957658.png)]

例2:
代码中设置:
//设置全局禁用任务链
env.disableOperatorChaining();
keyBy和sum是一个操作在一起,其余的任务链都分开了。需要3个slot
此外可针对算子单独设置:
flatMap.startNewChain()是和前边的划分开,出来一个新任务链。
flatMap.disableChaining()是前后都划分开,自己单独一个任务链。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a50HRZFF-1641372478562)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211214224757409.png)]

flatmap开启新链条

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hh5NT52K-1641372478563)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211214225535380.png)]

flatmap.disableChaining()

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2kVd3ADE-1641372478563)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211214225713026.png)]

总结:

任务链及共享组总结:
1、默认情况下
  所有算子都在一个共享组内,开启了任务链和并
	任务需要的slot数量:最大并行度
	任务链的个数:要看有无宽依赖,算子的并行度是否相同
2、	给算子独立设置了共享组
  开启了任务链合并(就近)
  所需的slot数量:每个共享组中最大并行度之和
	任务链的个数:宽依赖,并行度,共享组
3、全局禁用任务链
  任务就是在一个共享组内
	任务所需的slot数量:最大并行度
	任务链的个数:算子的个数
4、给某个算子开启新的任务链
  同一个共享组
	任务所需的slot数量:最大并行度
	任务链的个数:宽依赖,并行度,开启的新任务链的算子
5、给某个算子禁用任务链
  同一个共享组
	任务所需的slot数量:最大并行度
	任务链的个数:宽依赖,并行度,禁用任务链的算子
总之:
	slot:只跟共享组和最大并行度有关
	任务链的个数:跟宽依赖,并行度,并行度,时否开启和禁用任务链
	

subtask

默认共享组:
同一个算子的subtask肯定是在不同的slot执行的,因为flink是从任务的空闲时间和资源共享的角度考虑的
例如一个任务两个并行度
slot1        slot2
source       flatmap
flapmap      map
map          keyBy
keyBy
slot1中的算子是从上到下执行的,前边的执行完后空出资源,后边的算子可继续使用,且在同一个节点上
减少了网络传输且资源进行了充分利用,如果两个flatmap放一起,忙的时候都忙闲的时候都闲会争抢资源。
1.6 flink 流处理核心编程 1.6.1 environment
// 批处理环境
ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment();
// 流式数据处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
1.6.2 source
@Test
//从集合中读数据
      public void SourceCollection() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //准备集合
        List waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),
                new WaterSensor("ws_002", 1577844015L, 43),
                new WaterSensor("ws_003", 1577844020L, 42));
        //从集合读数据
        DataStreamSource waterSensorDataStreamSource = env.fromCollection(waterSensors);
        waterSensorDataStreamSource.print();
        env.execute();
      }
//从文件读数据
      @Test
      public void SourceFile() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource wordcount = env.readTextFile("F:\bigdata\flink\flink_learn_code\src\main\resources\input\wordcount");
        wordcount.print();
        env.execute();
      }
//从端口读数据
      @Test
      public void SourceScoket() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource wordcount = env.socketTextStream("linux1",9999);
        wordcount.print();
        env.execute();
      }
//从kafka读数据
    @Test
    public void SourceKafka() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"flinkkafka");
        DataStreamSource kafkaSD = env.addSource(new FlinkKafkaConsumer("sink_table", new SimpleStringSchema(), properties));
        kafkaSD.print();
        env.execute();
    }

自定义source

package operateFunction;

import bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

public class MySurce {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env
                .addSource(new MySource("linux1", 9999))
                .print();

        env.execute();
    }

    public static class MySource implements SourceFunction {
        private String host;
        private int port;
        private volatile boolean isRunning = true;
        private Socket socket;

        public MySource(String host, int port) {
            this.host = host;
            this.port = port;
        }


        @Override
        public void run(SourceContext ctx) throws Exception {
            // 实现一个从socket读取数据的source
            socket = new Socket(host, port);
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
            String line = reader.readLine();
            while (isRunning && line != null) {
                String[] split = line.split(" ");
                ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));
                line= reader.readLine();
            }
        }

        

        @Override
        public void cancel() {
            isRunning = false;

                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
1.6.3 transforms ①对数据操作的算子
map
flatmap:一进多出,炸裂

所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。例如:RichMapFunction
②重分区算子
package operateFunction;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TranceformRebalance {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(5);
        DataStreamSource socketDS = env.socketTextStream("linux1", 9999);
        SingleOutputStreamOperator map = socketDS.map(x -> x).setParallelism(2);
        map.print("map").setParallelism(2);

//        socketDS.keyBy(data -> data).print("keyby");
        //随机分区
//        socketDS.shuffle().print("shuffer");
        //rebalance map1是先找一个开始分区,比如5,然后 45-12345-12345  map2:345-12345-12345
        map.rebalance().print("rebalance");
        //rescale 是先将分区平分,然后数据在每个分区内轮询。map1: 45-345-345 map2:12-12-12-12
        map.rescale().print("rescale");
        //并行度是1
//        socketDS.global().print("gloal");
        //此处会报错,8个并行度往socket一个并行度发会报错 forward 上下算子的并行度要相同
        //socketDS.forward().print("forward");
        //broadcast发往下游所有分区,每个一份
//        socketDS.broadcast().print("broadcast");
        //自定义重分区
        env.execute();
    }
}

③控制流的算子

split切分后类型不可变,output侧输出流类型是可变的

split(数据类型不可变)和select:可以使用output侧输出流(数据类型可以改变)代替他

将流中数根据某一特征划分,给流打上标记,然后根据select能筛选出被打上指定标记的流(1.12已移除)
connect:
允许连接两个不同类型的流(String,Integer),只能连接两个流,连接后依然保持着各自的数据类型。
union:只能连接相同类型的流,可以连接2个以上甚至多个流。
返回的是ConnectedStreams类型,后面可以调用map,flatmap,process方法。
join:经常配合开窗操作
connect算子
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource intStream = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource stringStream = env.fromElements("a", "b", "c");
ConnectedStreams connect = intStream.connect(stringStream);
        // TODO ConnectedStreams 的map flatmap需要传CoMapFunction
        connect.map(new CoMapFunction() {
            @Override
            public Object map1(Integer value) throws Exception {
                return value;
            }

            @Overrides
            public Object map2(String value) throws Exception {
                return value;
            }
        });
④聚合算子
集合算子要在keyBy之后
常见的滚动聚合算子
sum, min,max,minBy,maxBy
如果流中存储的是POJO或者scala的样例类, 参数使用字段名
如果流中存储的是元组, 参数就是位置(基于0...)
reduce

//max只改变当前统计的这一个字段,其他字段会保持第一次输出的字段值不变
SingleOutputStreamOperator vc = keyedDS.max("vc");
//maxBy除了改变统计字段取最大值,如果两条数据最大值相同,其他值不同,其他字段的值通过后一个参数设置
//first参数:不是是取第一个
如果是false:取本条数据的字段,若是TRUE,取上一个最大值中的字段
SingleOutputStreamOperator vc1 = keyedDS.maxBy("vc",first=false);

reduce:
如果WaterSensor 温度和时间都要最大的:可使用reduce更灵活
SingleOutputStreamOperator reduceDS = keyedDS.reduce(new ReduceFunction() {
            @Override
            public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                //id:keyby后的id都一样,随便取其中一个,ts:取最大时间 vc:取最大水位
                return new WaterSensor(value1.getId(), value2.getTs(), Math.max(value1.getVc(), value2.getVc()));
            }
        });

⑤富函数
示例:
RichMapFunction:
1、open close生命周期方法:
程序在和其他第三方数据库有交互时使用,全局在每个并行度上只调用一次。
比如要在map中读完kafka数据写到mysql,map每次处理一条数据都创建关闭一次连接,可以使用声明周期方法中的open(创建连接方法) close(关闭连接方法)
2、getRuntimeContext() 获取运行时上下文:
可以做状态编程

⑥ process
最后process算子:
其他算子都搞不定,可以用最底层的process算子
package operateFunction;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class TransformProcess {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(5);
        DataStreamSource sockDS = env.socketTextStream("linux1", 9999);
        //process 实现flapmap功能
        SingleOutputStreamOperator flatMapDS = sockDS.process(new MyFlatmapProcess());
        //process 实现pmap功能
        SingleOutputStreamOperator> wordTooneDS = flatMapDS.process(new MapProcessFunction());
        KeyedStream, String> keyedDS = wordToOneDS.keyBy(new KeySelector, String>() {
            @Override
            public String getKey(Tuple2 value) throws Exception {
                return value.f0;
            }
        });
        SingleOutputStreamOperator> sum = keyedDS.sum(1);
        sum.print();

        env.execute();
    }
    public static class MyFlatmapProcess extends ProcessFunction {

        @Override
        public void processElement(String value, Context ctx, Collector out) throws Exception {
            String[] split = value.split(" ");
            for (String word:split) {
                out.collect(word);
            }
            //运行时上下文,状态编程
            RuntimeContext runtimeContext = getRuntimeContext();
            // //定时器
            TimerService timerService = ctx.timerService();
            timerService.registerEventTimeTimer(123L);
            timerService.deleteEventTimeTimer(123L);

            //获取当前处理数据的时间
            timerService.currentProcessingTime();
            //事件时间
            timerService.currentWatermark();

            //侧输出流
            //ctx.output();



        }
        //process中特殊的方法
        //生命周期方法
        @Override
        public void open(Configuration parameters) throws Exception { }
        @Override
        public void close() throws Exception { }


    }

    public static class MapProcessFunction extends ProcessFunction>{

        @Override
        public void processElement(String value, Context ctx, Collector> out) throws Exception {
            out.collect(new Tuple2<>(value,1));
        }
    }

}
练习:实现去重操作

使用hashSet做去重,但是当数据量大时hashSet是内存中的变量撑不住,考虑使用外部框架Redis,注意:使用Redis时也要先做keyBy操作。(避免并发,同时查不到的问题)

package exercise;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.HashSet;

public class Distinct {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(5);
        //去重的方式:使用set去重,但是注意,这种方式如果多并行度的话还是会对此输出,每个并行度都会输出一次相同的单词
        //优化,在使用set前先加一个keyBy这样同样的单词就进入到了同一个分区
        DataStreamSource stringDS = env.socketTextStream("linux1", 9988);
        HashSet hashSet = new HashSet<>();

        stringDS
                .flatMap(new FlatMapFunction() {
                    @Override
                    public void flatMap(String value, Collector out) throws Exception {
                        String[] splits = value.split(" ");
                        for (String split:splits) {
                            out.collect(split);
                        }
                    }
                })
                .keyBy(new KeySelector() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                })
                .filter(new FilterFunction() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        if(hashSet.contains(value)){
                            return false;
                        }else{
                            hashSet.add(value);
                            return true;
                        }
                    }
                })
                .print();

        env.execute();
    }
}
1.6.4 sink

flink中使用kafka source +ck+kafka sink 能实现精准一次性

① kafka sink
package sink;

import bean.WaterSensor;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class KafkaSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(5);
        DataStreamSource socketDS = env.socketTextStream("linux1", 9999);

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092");
        socketDS.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] split = value.split(" ");
                    out.collect(new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2])));
            }
        })
        .map(new MapFunction() {
            @Override
            public String map(WaterSensor value) throws Exception {
                return JSON.toJSonString(value);
            }
        })
         .addSink(new FlinkKafkaProducer("sink_table",new SimpleStringSchema(),properties));


        env.execute();
    }
}
输入:wa_1 12345678 45
输出:{"id":"wa_1","ts":12345678,"vc":45}
② Redis sink
package sink;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class RedisSinkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(5);
        DataStreamSource socketDS = env.socketTextStream("linux1", 9999);

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092");
        SingleOutputStreamOperator waterDS = socketDS.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] split = value.split(" ");
                out.collect(new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])));
            }
        });
        //FlinkJedisConfigbase flinkJedisConfigbase, RedisMapper redisSinkMapper
        // FlinkJedisPoolConfig 是实现类,构造器私有化,通常使用build方法创建对象
        FlinkJedisPoolConfig flinkJedis = new FlinkJedisPoolConfig.Builder()
                .setHost("linux2")
                .setPort(6379)
                .build();

        DataStreamSink redisSinknDS = waterDS.addSink(new RedisSink<>(flinkJedis, new MyRedisMapper()));
        redisSinknDS.setParallelism(1);

        env.execute();
    }

    public static  class MyRedisMapper implements RedisMapper{

        @Override
        //指定在Redis中存储的数据类型
        public RedisCommandDescription getCommandDescription() {
            //addotionalKey:"sensor" 就是给hash用的,addotionalKey就成了Redis的外部key ,getKeyFromData 就成了内部的field
            return new RedisCommandDescription(RedisCommand.HSET,"sensor");
        }

        @Override
        //从数中获取key
        public String getKeyFromData(WaterSensor data) {
            return data.getId();
        }

        @Override
        //从数据中获取value
        public String getValueFromData(WaterSensor data) {
            return data.getVc().toString();
        }
    }
}

③ ES Sink
package sink;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;

public class EsSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(5);
        // TODO 1、如果是无界流
        //对于无界流来说,会等到触发刷新出去的时间或条数才会写出,这个需要参数设置
        DataStreamSource socketDS = env.socketTextStream("linux1", 9999);
        // TODO 2、如果是有界流
        // 对于有界流来说,有界流最后会关闭,将数据刷新出去
        DataStreamSource fileDS = env.readTextFile("F:\bigdata\flink\flink_learn_code\src\main\resources\input\waterSensor");


        SingleOutputStreamOperator waterDS = socketDS.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] split = value.split(" ");
                out.collect(new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])));
            }
        });
        //将数据写入es
        //List httpHosts, ElasticsearchSinkFunction elasticsearchSinkFunction
        ArrayList httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("linux1", 9200));
        httpHosts.add(new HttpHost("linux2",9200));
        httpHosts.add(new HttpHost("linux3",9200));

        ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new MyElasticsearchSinkFunction());
        // TODO 1、设置批量提交参数 每一条数写出一次,适用于测试,生产频繁访问数据库效率很慢
        esSinkBuilder.setBulkFlushMaxActions(1);
        ElasticsearchSink buildES = esSinkBuilder.build();


        waterDS.addSink(buildES);
        env.execute();

    }

    public static class MyElasticsearchSinkFunction implements ElasticsearchSinkFunction {
        @Override
        public void open() throws Exception {
        }

        @Override
        public void close() throws Exception {
        }

        @Override
        //处理每一条数据
        //RuntimeContext 是关于es的参数,可以新增修改和删除
        public void process(WaterSensor waterSensor, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
            IndexRequest source = Requests.indexRequest()
                    .index("sensor1")
                    .type("_doc")
                    .source(waterSensor);//要输出的数据,此处可以写一个Javabean对象
            requestIndexer.add(source);
        }
    }
}
④自定义JDBC Sink
package sink;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;


public class SelfSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(5);
        DataStreamSource socketDS = env.socketTextStream("linux1", 9999);

        SingleOutputStreamOperator waterDS = socketDS.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] split = value.split(" ");
                out.collect(new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])));
            }
        });

        waterDS.addSink(new MySink());


        env.execute();
    }

    //用RichSinkFunction 有生命周期方法,获取和关闭连接执行一次,效率高
    //另外后面结合状态编程,可以在指定时间或指定条数到达后再批量写入数据库 (用定时器+list集合个数实现)
    public static class MySink extends RichSinkFunction{
        private  Connection connection;
        private PreparedStatement preparedStatement;

        //创建连接
        @Override
        public void open(Configuration parameters) throws Exception {
            connection= DriverManager.getConnection("jdbc:mysql://linux1:3306/realwarehouse?useSSL=false","root","000000");
             preparedStatement = connection.prepareStatement("INSERT INTO `sensor` values(?,?,?) ON DUPLICATE KEY UPDATe `ts`=?,`vc`=?");
        }

        @Override
        public void invoke(WaterSensor value, Context context) throws Exception {
            //占位符赋值
            preparedStatement.setString(1,value.getId());
            preparedStatement.setLong(2,value.getTs());
            preparedStatement.setInt(3,value.getVc());

            // ON DUPLICATE KEY UPDATE 的两个占位符
            preparedStatement.setLong(4,value.getTs());
            preparedStatement.setInt(5,value.getVc());
            preparedStatement.execute();

        }

        //关闭连接
        @Override
        public void close() throws Exception {
           preparedStatement.close();
           connection.close();
           //如果是查询语句,查询返回的resultSet也要关闭
        }
    }
}

⑤ flink_JDBC Sink
package sink;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.sql.PreparedStatement;
import java.sql.SQLException;

public class FlinkJDBCSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(5);
        DataStreamSource socketDS = env.socketTextStream("linux1", 9999);

        final SingleOutputStreamOperator waterDS = socketDS.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] split = value.split(",");
                out.collect(new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])));
            }
        });

        //JdbcExecutionOptions 4 个参数的sink是批量操作的
//        private final long batchIntervalMs;
//        private final int batchSize;
//        private final int maxRetries;
        waterDS.addSink(JdbcSink.sink(
                "INSERT INTO `sensor-0821` VALUES(?,?,?) ON DUPLICATE KEY UPDATE `ts`=?,`vc`=?",
                new JdbcStatementBuilder() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        //占位符赋值
                        preparedStatement.setString(1,waterSensor.getId());
                        preparedStatement.setLong(2,waterSensor.getTs());
                        preparedStatement.setInt(3,waterSensor.getVc());
                        preparedStatement.setLong(4,waterSensor.getTs());
                        preparedStatement.setInt(5,waterSensor.getVc());
                    }
                },
                JdbcExecutionOptions.builder().withMaxRetries(3).build()
                ,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://linux1:3306/realwarehouse?useSSL=false")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("000000")
                        .build()
                ));

        env.execute();
    }
}
1.6.5 lombook插件
 
Flink CDC 
1.1CDC种类 

基于查询的CDC & 基于binlog的CDC,主要区别在于:

基于查询的CDC基于Binlog的CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力
flink CDC内置了Debezium,
flinkCDC的初始化方式是可以选择的,有类似于Maxwell同步历史数据的功能,且此功能是选择参数可自定开启的,不用手动自己执行,同步历史数据时是通过查询的方式获得所有历史数据,同步完之后立即切换到binlog模式。且有断点续传和故障恢复功能。
flinkCDC的内容官网没有,是在git上的
https://github.com/ververica/flink-cdc-connectors
1.2 flinkCDC代码编写
package fenkong.realwarehouse.util;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class FlinkCDC {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        
        //ck 不做ck 每次都会从最新的位置开始读取数据
        env.enableCheckpointing(5000L); //每5秒做一次ck
        //ck 类型:EXACTLY_onCE 精准一次性 另一种是 AT_LEAST_onCE 最少一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置存储的状态后端
        env.setStateBackend(new FsStateBackend("hdfs://linux1:9820/flink/CDC/ck"));
        //设置重启策略 相当于无限重启
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2,1000L));
        //访问hdfs,设置访问HDFS的用户名
        System.setProperty("HADOOP_USER_NAME","root");
        //如果从页面cancal的任务,是会删除ck的,但是取消的任务也是需要保留ck的,加一个参数
  env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


        
        //创建mysql cdc的source
        Properties properties = new Properties();
        //scan.startup.mode","initial 每次程序重启都要重新读一遍数据库,太费时且可能会产生重复数据,做ck后可解决此问题
        properties.setProperty("scan.startup.mode","initial");
        DebeziumSourceFunction mysqlSource = MySQLSource.builder()
                .hostname("linux1")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("realwarehouse")
                //注意必须是db.table的方式,只写table读取不成功
                //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据
                .tableList("realwarehouse.customer")
                //flink cdc内置使用了debezium 故可以配置它的一些属性信息,包括CDC读取的初始化方式
                .debeziumProperties(properties)
                //可自定义读取数据的类型 StringDebeziumDeserializationSchema 因为这里是返回的string,另一种提供的是RowDataDebeziumDeserializeSchema
                //支持自定义反序列化器,后续会写案例
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();

        //读取mysql数据
        DataStreamSource stringDS = env.addSource(mysqlSource);
        //打印 (或者转成map,cdc流与数据流join,即能实现动态读取mysql中的配置表)
        stringDS.print();
        //执行
        env.execute();
    }

}

问题处理:

Caused by: java.lang.ClassNotFoundException: fengkong.realwarehouse.util.FlinkCdcTest

因为flinkCDC不是flink的核心包的东西,打包的时候需要把

com.alibaba.ververica
flink-connector-mysql-cdc
1.1.1

打包到项目代码中,有两种方式:
第一:把此包下载下来,放到flink的lib目录下重启flink集群(此处用的测试flink—standalone模式)
第二:在pom文件中引入打包插件,将需要的东西全下载下来,一起打包。
1.3 任务提交及做ck提交命令
1、启动flink程序
bin/flink run -c fenkong.realwarehouse.util.FlinkCDC /opt/software/realdata_flink-12-18-02.jar
hdfs://linux1:9820

2、复制下flink任务id,做ck操作,需要执行一下命令
bin/flink savepoint 7f5351a08ed5f01dd6ebab66d79a784b hdfs://linux1:9820/flink/CDC/ck
执行完后会有hdfs://linux1:9820/flink/CDC/ck/savepoint-2e43d4-cc7a8bade02b 重启的时候要用它来做

3、任务重启
bin/flink run -s hdfs://linux1:9820/flink/CDC/ck/savepoint-7f5351-3139b82033ff -c fenkong.realwarehouse.util.FlinkCDC /opt/software/realdata_flink-12-18-02.jar

1.4 用flinkSQL的方式实现此功能
package fenkong.realwarehouse.util;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class FlinkSQLCDC {
    public static void main(String[] args) throws Exception {
        //创建执行环境(流环境和table环境)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //ck
        //
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("CREATE TABLE user_info (" +
                "  id INT," +
                "  password STRING," +
                "  mobile STRING,"+
                "  add_time INT," +
                "  app_type INT"+
                ") WITH (" +
                "  'connector' = 'mysql-cdc'," +
                "  'hostname' = 'linux1'," +
                "  'port' = '3306'," +
                "  'username' = 'root'," +
                "  'password' = '000000'," +
                "  'database-name' = 'realwarehouse'," +
                "  'table-name' = 'customer'" +
                ")");

        TableResult tableResult = tableEnv.executeSql("select * from user_info");
        tableEnv.executeSql("select * from user_info").print();
        env.execute();
        //打印数据
// -----------+-------------+-------------+-------------+-------------+-----------+
//  | op |          id |       password |          mobile |    add_time |    app_type |
// -----------+-------------+-------------+-------------+-------------+-----------+
// | +I |         24830 |      4561 |         18732365952 |  1639547481 |           1 |
// | +I |         24831 |      1261 |         1777888952 |  1639547482 |           1 |
// | +I |         24832 |      2261 |         1677888952 |  1639547483 |           1 |
        
    }
}

1.5 自定义序列化

1.2中使用的string类型返回mysql的binlog数据,但是返回的是两个对象,没法对数据解析,需要自定义反序列化器。

  // .deserializer(new MyDebezium())
    public static class MyDebezium implements DebeziumDeserializationSchema{

        //反序列化,将返回的两个对象封装成json返回
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
            //db & table
            String topic = sourceRecord.topic();
            String[] split = topic.split("\.");
            String db = split[1];
            String table = split[2];
            //获取操作类型
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            //获取数据data
            Struct value =(Struct) sourceRecord.value();
            Struct after = value.getStruct("after");
            JSonObject dataAfter = new JSonObject();
            if (after != null){
                Schema schema = after.schema();
                for(Field fields : schema.fields()){
                    Object o = after.get(fields);
                    dataAfter.put(fields.name(),o);
                }
            }

            Struct before = value.getStruct("before");
            JSonObject dataBefore = new JSonObject();
            if (before != null){
                Schema schema = before.schema();
                for(Field fields : schema.fields()){
                    Object o = before.get(fields);
                    dataBefore.put(fields.name(),o);
                }
            }

            //创建JSON对象用于封装最终返回值数据信息
            JSonObject result = new JSonObject();
            result.put("database",db);
            result.put("table",table);
            result.put("data",dataAfter);
            result.put("dataBefore",dataBefore);
            result.put("operation",operation.toString().toLowerCase());//转成小写

            collector.collect(result.toJSonString());
        }

        //定义返回类型
        @Override
        public TypeInformation getProducedType() {
            return TypeInformation.of(String.class);
        }
    }
insert类型:只有data
update类型:有before有after
delete类型:只有before
1.6 采用广播流做动态分流

动态分流另一种方案:flink 定时读取mysql,更新map,然后完成后续的建表插入数据的操作。

此处使用flinkCDC监听mysql binlog,形成一条数据流,这条流广播后和主流回合。(更优)

 

ter.get(fields);
dataAfter.put(fields.name(),o);
}
}

        Struct before = value.getStruct("before");
        JSonObject dataBefore = new JSonObject();
        if (before != null){
            Schema schema = before.schema();
            for(Field fields : schema.fields()){
                Object o = before.get(fields);
                dataBefore.put(fields.name(),o);
            }
        }

        //创建JSON对象用于封装最终返回值数据信息
        JSonObject result = new JSonObject();
        result.put("database",db);
        result.put("table",table);
        result.put("data",dataAfter);
        result.put("dataBefore",dataBefore);
        result.put("operation",operation.toString().toLowerCase());//转成小写

        collector.collect(result.toJSonString());
    }

    //定义返回类型
    @Override
    public TypeInformation getProducedType() {
        return TypeInformation.of(String.class);
    }
}

insert类型:只有data
update类型:有before有after
delete类型:只有before

## 1.6 采用广播流做动态分流

动态分流另一种方案:flink 定时读取mysql,更新map,然后完成后续的建表插入数据的操作。

此处使用flinkCDC监听mysql binlog,形成一条数据流,这条流广播后和主流回合。(更优)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/699844.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号