版本问题,flink使用Scala时,flink1.12兼容scala2.11,flink1.12不兼容scala2.12
一、简介flink是对有界流和无界数据流进行有状态的计算(所谓状态,指的是之前的数据sparkStreaming中只有updatestateByKey有状态,但是flink中任何的算子都可以有状态,可以自己定义)
1.1sparkStreaming与flink的区别(1)事件驱动型 sparkStreaming是时间驱动,一个批次中假如没有数据依然会从kafka拉取数据,走计算逻辑,只不过没有结果。 flink是事件驱动的(事件即指数据)来一条计算一条,不会浪费资源且实时性更好。(是和spark批处理的本质区别) (2)流处理与批处理 spark处理数据都是按照批次处理的,如果每隔批次的时间较长,可认为是批处理,若每个批次相隔的时间很短,是微批处理来模拟的流处理。 flink的流与批是根据数据有无结束限定的。如果flink读取文本文件有开头有结尾是批处理(批处理时也是事件驱动型,来一条数据处理一条),如果从kafka读取数据,有开头没有结尾是流处理(流处理是事件驱动型) flink使用时间窗口就能实现spark的微批处理。 (3)关于并行度的问题 spark在代码中可以更改并行度(即进行分区数的设置,但是只有一些特定的算子能进行指定,像map,flapmap是不能的) flink中任何算子都能设置并行度(map.setParallelism(3))注意keyby没有,因为keyby是hash操作将数据送入后面的算子中。 flink与spark任务划分的方式不同,spark根据产生shuffer操作会产生新的stage,而flink是只要算子之间的并行度不同就会产生新的stage (4)
以上(3)结论的图[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KJpOUwix-1641372478560)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211210220337526.png)]
代码如下:读端口并行度为1,flatmap:3,keyby,sum:2,print:4是env设置的全局并行度
public class Flink03_WordCount_Unbounded {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
//读端口数据
DataStreamSource sourceDS = env.socketTextStream("linux2", 9999);
//读取的数据扁平化
SingleOutputStreamOperator> tupleDS = sourceDS.flatMap(new Flink02_WordCount_Bounded.MyFlapMapFunc()).setParallelism(3);
KeyedStream, String> keyDS = tupleDS.keyBy(new KeySelector, String>() {
public String getKey(Tuple2 value) throws Exception {
return value.f0;
}
});
SingleOutputStreamOperator> result = keyDS.sum(1).setParallelism(2);
result.print();
//启动任务
env.execute();
}
}
1.2分层API
2020年12月8日发布的最新版本****1.12.0****, 已经完成实现了真正的****流批一体****
从简到易: sql:SQL查询可以直接在Table API定义的表上执行。 table api:表可以动态变化,API提供可比较的操作,例如select、project、join、group-by、aggregate等,用户可自定义函数(UDF)进行扩展,Table API程序在执行之前会经过内置优化器进行优化。 datastream/dataSet API:DataStream/DataSet 与表可以无缝切换。 process function:最底层的API,算子无法实现的需要使用最底层process编写程序。 sql也能应用与流处理1.3flink 流批处理不同
flink不向spark,没有reducrByKey,分组和聚合是分开的,先groupby,再聚合。? flink流(StreamExecutionEnvironment)处理与批(ExecutionEnvironment)处理所需的执行环境不同。 flink批处理有groupby,流处理只有keyBy。 flink批处理也是来一条执行一条,只不过最后输出一次结果,flink流处理来一条执行一条输出一次结果。1.4 flink 部署模式
(1)local模式 (2)standaloan模式:提交任务前要先启动flink集群。是flink自己提供计算资源,不依赖于其他框架,可在没有hadoop节点中单独安装flink集群 flink-conf.xml配置文件: 1、可配置taskmanager总共内存,单节点可用的插槽,默认并行度 2、jobmanager的高可用,包含zk的配置 3、状态后端:指的是存储状态的地址,通常有3种,jobmanager(是内存级别的),hdfs,rocksdb(是指一种kv的存储格式,且支持增量的操作,是最常用的) (3)yarn模式:用yarn做资源的分配,基于hadoop集群之上。 flink基于yarn墨水有3种提交作业的方式 ① session-cluster:启动flink集群后向yarn申请得到资源,这个资源是永远不变的,当提交多个任务时,会共用申请到的资源,如果前面的job用尽了申请到的slot且任务不结束,后面的job则申请不到资源无法执行。此种模式适合job比较短的需要频繁提交的批处理任务。 备注:session和per-job的区别:session是一次性申请flink集群资源,所有的任务用这一个资源,job运行完后下一个任务只需要去获得slot即可,而per-job需要针对每隔任务都去yarn申请资源,所以per-job模式提交会比session提交反应慢。(数据库连接池与普通获取数据库连接方式的区别吧) ② application mode:client 备注:application和per-job区别:用户自主程序运行的位置。application相当于spark的client,用户住程序运行在本地的jobmanager ③ per-job-cluster:cluster,用的最多的是per-job-cluster1.5 flink运行架构
1、jobManager:作用 ①. 响应客户端请求 ②.向resureManager申请资源 ③.将job生成作业图-数据流图-执行图(切分任务) ④.分发执行图到taskManager上,负责任务的协调 2、resourceManager: ①.此处是flink单独的resourceManger,不是yarn的 ②.flink的resourceManager和jobManager运行在同一个节点上 3、dispatcher:分发器: 只有能在网页上提交任务的模式才有触发器 4、taskManager:tm上有slot插槽,slot是对tm的内存资源进行隔离,并不会隔离cpu 5、并行度 一个流程序的并行度是代码算子中最大的并行度,而每个算子可以有不同的并行度,每个算子的并行度就叫subtask stream在算子之间传输数据的形式有两种: ① one-to-one:source--map(flatMap)算子之间没有重分区是one-to-one的对应关系,类似spark的窄依赖。 ②Redistributing:flatMap-keyBy(or window)有重分区,每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy()基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,类似spark的宽依赖。 6、Operator Chains(任务链) 类似spark中的stage 但是flink的任务链划分和spark中stage划分是不一样的。 想要算子划分到同一个任务链中必须满足以下3个条件: 第一:算子没有产生重分区 第二:算子之间的并行度要相同 第三:算子属于同一个共享组 这样划分的目的是为了减少网络IO的。
7、共享组
//全局并行度为 1
// souorce --flatMap.slotSharingGroup("group1")--map --keyBy--sum.slotSharingGroup("group2")--print
map是在default还是在group1组?
任务链划分如下:
souorce / flatMap及map在group1组 /keyBy及sum及print在default组
思考:
slot的个数通常是job中最大的并行度,这里并行度是2,却使用了5个slot,原因是什么?
是不同算子设置了不同的共享组。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yqSJz6Bj-1641372478561)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211214213600616.png)]
例1:
//全局并行度为 1
// souorce --flatMap.slotSharingGroup("group1")--map.setParallelism(3) --keyBy--sum.slotSharingGroup("group2")--print
会切分4个任务链,使用5个slot(可认为共享组有就近原则?)
所以之前说的slot的数量是算子中的最大并行度是同一个共享组的时候,完善应是:
任务所需的slot数量是每个共享组中最大的并行度之和。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GiAr61RG-1641372478561)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211214223635769.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pjDK6lnH-1641372478562)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211214223957658.png)]
例2: 代码中设置: //设置全局禁用任务链 env.disableOperatorChaining(); keyBy和sum是一个操作在一起,其余的任务链都分开了。需要3个slot 此外可针对算子单独设置: flatMap.startNewChain()是和前边的划分开,出来一个新任务链。 flatMap.disableChaining()是前后都划分开,自己单独一个任务链。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a50HRZFF-1641372478562)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211214224757409.png)]
flatmap开启新链条
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hh5NT52K-1641372478563)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211214225535380.png)]
flatmap.disableChaining()
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2kVd3ADE-1641372478563)(C:UsersjingAppDataRoamingTyporatypora-user-imagesimage-20211214225713026.png)]
总结:
任务链及共享组总结: 1、默认情况下 所有算子都在一个共享组内,开启了任务链和并 任务需要的slot数量:最大并行度 任务链的个数:要看有无宽依赖,算子的并行度是否相同 2、 给算子独立设置了共享组 开启了任务链合并(就近) 所需的slot数量:每个共享组中最大并行度之和 任务链的个数:宽依赖,并行度,共享组 3、全局禁用任务链 任务就是在一个共享组内 任务所需的slot数量:最大并行度 任务链的个数:算子的个数 4、给某个算子开启新的任务链 同一个共享组 任务所需的slot数量:最大并行度 任务链的个数:宽依赖,并行度,开启的新任务链的算子 5、给某个算子禁用任务链 同一个共享组 任务所需的slot数量:最大并行度 任务链的个数:宽依赖,并行度,禁用任务链的算子 总之: slot:只跟共享组和最大并行度有关 任务链的个数:跟宽依赖,并行度,并行度,时否开启和禁用任务链
subtask
默认共享组: 同一个算子的subtask肯定是在不同的slot执行的,因为flink是从任务的空闲时间和资源共享的角度考虑的 例如一个任务两个并行度 slot1 slot2 source flatmap flapmap map map keyBy keyBy slot1中的算子是从上到下执行的,前边的执行完后空出资源,后边的算子可继续使用,且在同一个节点上 减少了网络传输且资源进行了充分利用,如果两个flatmap放一起,忙的时候都忙闲的时候都闲会争抢资源。1.6 flink 流处理核心编程 1.6.1 environment
// 批处理环境 ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment(); // 流式数据处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();1.6.2 source
@Test
//从集合中读数据
public void SourceCollection() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//准备集合
List waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),
new WaterSensor("ws_002", 1577844015L, 43),
new WaterSensor("ws_003", 1577844020L, 42));
//从集合读数据
DataStreamSource waterSensorDataStreamSource = env.fromCollection(waterSensors);
waterSensorDataStreamSource.print();
env.execute();
}
//从文件读数据
@Test
public void SourceFile() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource wordcount = env.readTextFile("F:\bigdata\flink\flink_learn_code\src\main\resources\input\wordcount");
wordcount.print();
env.execute();
}
//从端口读数据
@Test
public void SourceScoket() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource wordcount = env.socketTextStream("linux1",9999);
wordcount.print();
env.execute();
}
//从kafka读数据
@Test
public void SourceKafka() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092,linux2:9092,linux3:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"flinkkafka");
DataStreamSource kafkaSD = env.addSource(new FlinkKafkaConsumer("sink_table", new SimpleStringSchema(), properties));
kafkaSD.print();
env.execute();
}
自定义source
package operateFunction;
import bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
public class MySurce {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(new MySource("linux1", 9999))
.print();
env.execute();
}
public static class MySource implements SourceFunction {
private String host;
private int port;
private volatile boolean isRunning = true;
private Socket socket;
public MySource(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public void run(SourceContext ctx) throws Exception {
// 实现一个从socket读取数据的source
socket = new Socket(host, port);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
String line = reader.readLine();
while (isRunning && line != null) {
String[] split = line.split(" ");
ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));
line= reader.readLine();
}
}
@Override
public void cancel() {
isRunning = false;
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
1.6.3 transforms
①对数据操作的算子
map flatmap:一进多出,炸裂 所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。例如:RichMapFunction②重分区算子
package operateFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TranceformRebalance {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
DataStreamSource socketDS = env.socketTextStream("linux1", 9999);
SingleOutputStreamOperator map = socketDS.map(x -> x).setParallelism(2);
map.print("map").setParallelism(2);
// socketDS.keyBy(data -> data).print("keyby");
//随机分区
// socketDS.shuffle().print("shuffer");
//rebalance map1是先找一个开始分区,比如5,然后 45-12345-12345 map2:345-12345-12345
map.rebalance().print("rebalance");
//rescale 是先将分区平分,然后数据在每个分区内轮询。map1: 45-345-345 map2:12-12-12-12
map.rescale().print("rescale");
//并行度是1
// socketDS.global().print("gloal");
//此处会报错,8个并行度往socket一个并行度发会报错 forward 上下算子的并行度要相同
//socketDS.forward().print("forward");
//broadcast发往下游所有分区,每个一份
// socketDS.broadcast().print("broadcast");
//自定义重分区
env.execute();
}
}
③控制流的算子
split切分后类型不可变,output侧输出流类型是可变的
split(数据类型不可变)和select:可以使用output侧输出流(数据类型可以改变)代替他 将流中数根据某一特征划分,给流打上标记,然后根据select能筛选出被打上指定标记的流(1.12已移除) connect: 允许连接两个不同类型的流(String,Integer),只能连接两个流,连接后依然保持着各自的数据类型。 union:只能连接相同类型的流,可以连接2个以上甚至多个流。 返回的是ConnectedStreams类型,后面可以调用map,flatmap,process方法。 join:经常配合开窗操作
connect算子
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource intStream = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource stringStream = env.fromElements("a", "b", "c");
ConnectedStreams connect = intStream.connect(stringStream);
// TODO ConnectedStreams 的map flatmap需要传CoMapFunction
connect.map(new CoMapFunction() {
@Override
public Object map1(Integer value) throws Exception {
return value;
}
@Overrides
public Object map2(String value) throws Exception {
return value;
}
});
④聚合算子
集合算子要在keyBy之后 常见的滚动聚合算子 sum, min,max,minBy,maxBy 如果流中存储的是POJO或者scala的样例类, 参数使用字段名 如果流中存储的是元组, 参数就是位置(基于0...) reduce //max只改变当前统计的这一个字段,其他字段会保持第一次输出的字段值不变 SingleOutputStreamOperator⑤富函数vc = keyedDS.max("vc"); //maxBy除了改变统计字段取最大值,如果两条数据最大值相同,其他值不同,其他字段的值通过后一个参数设置 //first参数:不是是取第一个 如果是false:取本条数据的字段,若是TRUE,取上一个最大值中的字段 SingleOutputStreamOperator vc1 = keyedDS.maxBy("vc",first=false); reduce: 如果WaterSensor 温度和时间都要最大的:可使用reduce更灵活 SingleOutputStreamOperator reduceDS = keyedDS.reduce(new ReduceFunction () { @Override public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception { //id:keyby后的id都一样,随便取其中一个,ts:取最大时间 vc:取最大水位 return new WaterSensor(value1.getId(), value2.getTs(), Math.max(value1.getVc(), value2.getVc())); } });
示例: RichMapFunction: 1、open close生命周期方法: 程序在和其他第三方数据库有交互时使用,全局在每个并行度上只调用一次。 比如要在map中读完kafka数据写到mysql,map每次处理一条数据都创建关闭一次连接,可以使用声明周期方法中的open(创建连接方法) close(关闭连接方法) 2、getRuntimeContext() 获取运行时上下文: 可以做状态编程⑥ process
最后process算子: 其他算子都搞不定,可以用最底层的process算子
package operateFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class TransformProcess {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
DataStreamSource sockDS = env.socketTextStream("linux1", 9999);
//process 实现flapmap功能
SingleOutputStreamOperator flatMapDS = sockDS.process(new MyFlatmapProcess());
//process 实现pmap功能
SingleOutputStreamOperator> wordTooneDS = flatMapDS.process(new MapProcessFunction());
KeyedStream, String> keyedDS = wordToOneDS.keyBy(new KeySelector, String>() {
@Override
public String getKey(Tuple2 value) throws Exception {
return value.f0;
}
});
SingleOutputStreamOperator> sum = keyedDS.sum(1);
sum.print();
env.execute();
}
public static class MyFlatmapProcess extends ProcessFunction {
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
String[] split = value.split(" ");
for (String word:split) {
out.collect(word);
}
//运行时上下文,状态编程
RuntimeContext runtimeContext = getRuntimeContext();
// //定时器
TimerService timerService = ctx.timerService();
timerService.registerEventTimeTimer(123L);
timerService.deleteEventTimeTimer(123L);
//获取当前处理数据的时间
timerService.currentProcessingTime();
//事件时间
timerService.currentWatermark();
//侧输出流
//ctx.output();
}
//process中特殊的方法
//生命周期方法
@Override
public void open(Configuration parameters) throws Exception { }
@Override
public void close() throws Exception { }
}
public static class MapProcessFunction extends ProcessFunction>{
@Override
public void processElement(String value, Context ctx, Collector> out) throws Exception {
out.collect(new Tuple2<>(value,1));
}
}
}
练习:实现去重操作
使用hashSet做去重,但是当数据量大时hashSet是内存中的变量撑不住,考虑使用外部框架Redis,注意:使用Redis时也要先做keyBy操作。(避免并发,同时查不到的问题)
package exercise;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.HashSet;
public class Distinct {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
//去重的方式:使用set去重,但是注意,这种方式如果多并行度的话还是会对此输出,每个并行度都会输出一次相同的单词
//优化,在使用set前先加一个keyBy这样同样的单词就进入到了同一个分区
DataStreamSource stringDS = env.socketTextStream("linux1", 9988);
HashSet hashSet = new HashSet<>();
stringDS
.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] splits = value.split(" ");
for (String split:splits) {
out.collect(split);
}
}
})
.keyBy(new KeySelector() {
@Override
public String getKey(String value) throws Exception {
return value;
}
})
.filter(new FilterFunction() {
@Override
public boolean filter(String value) throws Exception {
if(hashSet.contains(value)){
return false;
}else{
hashSet.add(value);
return true;
}
}
})
.print();
env.execute();
}
}
1.6.4 sink
flink中使用kafka source +ck+kafka sink 能实现精准一次性
① kafka sinkpackage sink;
import bean.WaterSensor;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class KafkaSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
DataStreamSource socketDS = env.socketTextStream("linux1", 9999);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092");
socketDS.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] split = value.split(" ");
out.collect(new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2])));
}
})
.map(new MapFunction() {
@Override
public String map(WaterSensor value) throws Exception {
return JSON.toJSonString(value);
}
})
.addSink(new FlinkKafkaProducer("sink_table",new SimpleStringSchema(),properties));
env.execute();
}
}
输入:wa_1 12345678 45
输出:{"id":"wa_1","ts":12345678,"vc":45}
② Redis sink
package sink;
import bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class RedisSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
DataStreamSource socketDS = env.socketTextStream("linux1", 9999);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux1:9092");
SingleOutputStreamOperator waterDS = socketDS.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] split = value.split(" ");
out.collect(new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])));
}
});
//FlinkJedisConfigbase flinkJedisConfigbase, RedisMapper redisSinkMapper
// FlinkJedisPoolConfig 是实现类,构造器私有化,通常使用build方法创建对象
FlinkJedisPoolConfig flinkJedis = new FlinkJedisPoolConfig.Builder()
.setHost("linux2")
.setPort(6379)
.build();
DataStreamSink redisSinknDS = waterDS.addSink(new RedisSink<>(flinkJedis, new MyRedisMapper()));
redisSinknDS.setParallelism(1);
env.execute();
}
public static class MyRedisMapper implements RedisMapper{
@Override
//指定在Redis中存储的数据类型
public RedisCommandDescription getCommandDescription() {
//addotionalKey:"sensor" 就是给hash用的,addotionalKey就成了Redis的外部key ,getKeyFromData 就成了内部的field
return new RedisCommandDescription(RedisCommand.HSET,"sensor");
}
@Override
//从数中获取key
public String getKeyFromData(WaterSensor data) {
return data.getId();
}
@Override
//从数据中获取value
public String getValueFromData(WaterSensor data) {
return data.getVc().toString();
}
}
}
③ ES Sink
package sink;
import bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
public class EsSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
// TODO 1、如果是无界流
//对于无界流来说,会等到触发刷新出去的时间或条数才会写出,这个需要参数设置
DataStreamSource socketDS = env.socketTextStream("linux1", 9999);
// TODO 2、如果是有界流
// 对于有界流来说,有界流最后会关闭,将数据刷新出去
DataStreamSource fileDS = env.readTextFile("F:\bigdata\flink\flink_learn_code\src\main\resources\input\waterSensor");
SingleOutputStreamOperator waterDS = socketDS.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] split = value.split(" ");
out.collect(new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])));
}
});
//将数据写入es
//List httpHosts, ElasticsearchSinkFunction elasticsearchSinkFunction
ArrayList httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("linux1", 9200));
httpHosts.add(new HttpHost("linux2",9200));
httpHosts.add(new HttpHost("linux3",9200));
ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new MyElasticsearchSinkFunction());
// TODO 1、设置批量提交参数 每一条数写出一次,适用于测试,生产频繁访问数据库效率很慢
esSinkBuilder.setBulkFlushMaxActions(1);
ElasticsearchSink buildES = esSinkBuilder.build();
waterDS.addSink(buildES);
env.execute();
}
public static class MyElasticsearchSinkFunction implements ElasticsearchSinkFunction {
@Override
public void open() throws Exception {
}
@Override
public void close() throws Exception {
}
@Override
//处理每一条数据
//RuntimeContext 是关于es的参数,可以新增修改和删除
public void process(WaterSensor waterSensor, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
IndexRequest source = Requests.indexRequest()
.index("sensor1")
.type("_doc")
.source(waterSensor);//要输出的数据,此处可以写一个Javabean对象
requestIndexer.add(source);
}
}
}
④自定义JDBC Sink
package sink;
import bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class SelfSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
DataStreamSource socketDS = env.socketTextStream("linux1", 9999);
SingleOutputStreamOperator waterDS = socketDS.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] split = value.split(" ");
out.collect(new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])));
}
});
waterDS.addSink(new MySink());
env.execute();
}
//用RichSinkFunction 有生命周期方法,获取和关闭连接执行一次,效率高
//另外后面结合状态编程,可以在指定时间或指定条数到达后再批量写入数据库 (用定时器+list集合个数实现)
public static class MySink extends RichSinkFunction{
private Connection connection;
private PreparedStatement preparedStatement;
//创建连接
@Override
public void open(Configuration parameters) throws Exception {
connection= DriverManager.getConnection("jdbc:mysql://linux1:3306/realwarehouse?useSSL=false","root","000000");
preparedStatement = connection.prepareStatement("INSERT INTO `sensor` values(?,?,?) ON DUPLICATE KEY UPDATe `ts`=?,`vc`=?");
}
@Override
public void invoke(WaterSensor value, Context context) throws Exception {
//占位符赋值
preparedStatement.setString(1,value.getId());
preparedStatement.setLong(2,value.getTs());
preparedStatement.setInt(3,value.getVc());
// ON DUPLICATE KEY UPDATE 的两个占位符
preparedStatement.setLong(4,value.getTs());
preparedStatement.setInt(5,value.getVc());
preparedStatement.execute();
}
//关闭连接
@Override
public void close() throws Exception {
preparedStatement.close();
connection.close();
//如果是查询语句,查询返回的resultSet也要关闭
}
}
}
⑤ flink_JDBC Sink
package sink;
import bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class FlinkJDBCSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
DataStreamSource socketDS = env.socketTextStream("linux1", 9999);
final SingleOutputStreamOperator waterDS = socketDS.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] split = value.split(",");
out.collect(new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])));
}
});
//JdbcExecutionOptions 4 个参数的sink是批量操作的
// private final long batchIntervalMs;
// private final int batchSize;
// private final int maxRetries;
waterDS.addSink(JdbcSink.sink(
"INSERT INTO `sensor-0821` VALUES(?,?,?) ON DUPLICATE KEY UPDATE `ts`=?,`vc`=?",
new JdbcStatementBuilder() {
@Override
public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
//占位符赋值
preparedStatement.setString(1,waterSensor.getId());
preparedStatement.setLong(2,waterSensor.getTs());
preparedStatement.setInt(3,waterSensor.getVc());
preparedStatement.setLong(4,waterSensor.getTs());
preparedStatement.setInt(5,waterSensor.getVc());
}
},
JdbcExecutionOptions.builder().withMaxRetries(3).build()
,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://linux1:3306/realwarehouse?useSSL=false")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("000000")
.build()
));
env.execute();
}
}
1.6.5 lombook插件
Flink CDC 1.1CDC种类
基于查询的CDC & 基于binlog的CDC,主要区别在于:
| 基于查询的CDC | 基于Binlog的CDC | |
|---|---|---|
| 开源产品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
| 执行模式 | Batch | Streaming |
| 是否可以捕获所有数据变化 | 否 | 是 |
| 延迟性 | 高延迟 | 低延迟 |
| 是否增加数据库压力 | 是 | 否 |
flink CDC内置了Debezium, flinkCDC的初始化方式是可以选择的,有类似于Maxwell同步历史数据的功能,且此功能是选择参数可自定开启的,不用手动自己执行,同步历史数据时是通过查询的方式获得所有历史数据,同步完之后立即切换到binlog模式。且有断点续传和故障恢复功能。 flinkCDC的内容官网没有,是在git上的 https://github.com/ververica/flink-cdc-connectors1.2 flinkCDC代码编写
package fenkong.realwarehouse.util;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//ck 不做ck 每次都会从最新的位置开始读取数据
env.enableCheckpointing(5000L); //每5秒做一次ck
//ck 类型:EXACTLY_onCE 精准一次性 另一种是 AT_LEAST_onCE 最少一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置存储的状态后端
env.setStateBackend(new FsStateBackend("hdfs://linux1:9820/flink/CDC/ck"));
//设置重启策略 相当于无限重启
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2,1000L));
//访问hdfs,设置访问HDFS的用户名
System.setProperty("HADOOP_USER_NAME","root");
//如果从页面cancal的任务,是会删除ck的,但是取消的任务也是需要保留ck的,加一个参数
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//创建mysql cdc的source
Properties properties = new Properties();
//scan.startup.mode","initial 每次程序重启都要重新读一遍数据库,太费时且可能会产生重复数据,做ck后可解决此问题
properties.setProperty("scan.startup.mode","initial");
DebeziumSourceFunction mysqlSource = MySQLSource.builder()
.hostname("linux1")
.port(3306)
.username("root")
.password("000000")
.databaseList("realwarehouse")
//注意必须是db.table的方式,只写table读取不成功
//可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据
.tableList("realwarehouse.customer")
//flink cdc内置使用了debezium 故可以配置它的一些属性信息,包括CDC读取的初始化方式
.debeziumProperties(properties)
//可自定义读取数据的类型 StringDebeziumDeserializationSchema 因为这里是返回的string,另一种提供的是RowDataDebeziumDeserializeSchema
//支持自定义反序列化器,后续会写案例
.deserializer(new StringDebeziumDeserializationSchema())
.build();
//读取mysql数据
DataStreamSource stringDS = env.addSource(mysqlSource);
//打印 (或者转成map,cdc流与数据流join,即能实现动态读取mysql中的配置表)
stringDS.print();
//执行
env.execute();
}
}
问题处理:
Caused by: java.lang.ClassNotFoundException: fengkong.realwarehouse.util.FlinkCdcTest
因为flinkCDC不是flink的核心包的东西,打包的时候需要把1.3 任务提交及做ck提交命令打包到项目代码中,有两种方式: 第一:把此包下载下来,放到flink的lib目录下重启flink集群(此处用的测试flink—standalone模式) 第二:在pom文件中引入打包插件,将需要的东西全下载下来,一起打包。 com.alibaba.ververica flink-connector-mysql-cdc1.1.1
1、启动flink程序 bin/flink run -c fenkong.realwarehouse.util.FlinkCDC /opt/software/realdata_flink-12-18-02.jar hdfs://linux1:9820 2、复制下flink任务id,做ck操作,需要执行一下命令 bin/flink savepoint 7f5351a08ed5f01dd6ebab66d79a784b hdfs://linux1:9820/flink/CDC/ck 执行完后会有hdfs://linux1:9820/flink/CDC/ck/savepoint-2e43d4-cc7a8bade02b 重启的时候要用它来做 3、任务重启 bin/flink run -s hdfs://linux1:9820/flink/CDC/ck/savepoint-7f5351-3139b82033ff -c fenkong.realwarehouse.util.FlinkCDC /opt/software/realdata_flink-12-18-02.jar1.4 用flinkSQL的方式实现此功能
package fenkong.realwarehouse.util;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQLCDC {
public static void main(String[] args) throws Exception {
//创建执行环境(流环境和table环境)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//ck
//
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE user_info (" +
" id INT," +
" password STRING," +
" mobile STRING,"+
" add_time INT," +
" app_type INT"+
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'linux1'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '000000'," +
" 'database-name' = 'realwarehouse'," +
" 'table-name' = 'customer'" +
")");
TableResult tableResult = tableEnv.executeSql("select * from user_info");
tableEnv.executeSql("select * from user_info").print();
env.execute();
//打印数据
// -----------+-------------+-------------+-------------+-------------+-----------+
// | op | id | password | mobile | add_time | app_type |
// -----------+-------------+-------------+-------------+-------------+-----------+
// | +I | 24830 | 4561 | 18732365952 | 1639547481 | 1 |
// | +I | 24831 | 1261 | 1777888952 | 1639547482 | 1 |
// | +I | 24832 | 2261 | 1677888952 | 1639547483 | 1 |
}
}
1.5 自定义序列化
1.2中使用的string类型返回mysql的binlog数据,但是返回的是两个对象,没法对数据解析,需要自定义反序列化器。
// .deserializer(new MyDebezium())
public static class MyDebezium implements DebeziumDeserializationSchema{
//反序列化,将返回的两个对象封装成json返回
@Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
//db & table
String topic = sourceRecord.topic();
String[] split = topic.split("\.");
String db = split[1];
String table = split[2];
//获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
//获取数据data
Struct value =(Struct) sourceRecord.value();
Struct after = value.getStruct("after");
JSonObject dataAfter = new JSonObject();
if (after != null){
Schema schema = after.schema();
for(Field fields : schema.fields()){
Object o = after.get(fields);
dataAfter.put(fields.name(),o);
}
}
Struct before = value.getStruct("before");
JSonObject dataBefore = new JSonObject();
if (before != null){
Schema schema = before.schema();
for(Field fields : schema.fields()){
Object o = before.get(fields);
dataBefore.put(fields.name(),o);
}
}
//创建JSON对象用于封装最终返回值数据信息
JSonObject result = new JSonObject();
result.put("database",db);
result.put("table",table);
result.put("data",dataAfter);
result.put("dataBefore",dataBefore);
result.put("operation",operation.toString().toLowerCase());//转成小写
collector.collect(result.toJSonString());
}
//定义返回类型
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(String.class);
}
}
insert类型:只有data
update类型:有before有after
delete类型:只有before
1.6 采用广播流做动态分流
动态分流另一种方案:flink 定时读取mysql,更新map,然后完成后续的建表插入数据的操作。
此处使用flinkCDC监听mysql binlog,形成一条数据流,这条流广播后和主流回合。(更优)
ter.get(fields);
dataAfter.put(fields.name(),o);
}
}
Struct before = value.getStruct("before");
JSonObject dataBefore = new JSonObject();
if (before != null){
Schema schema = before.schema();
for(Field fields : schema.fields()){
Object o = before.get(fields);
dataBefore.put(fields.name(),o);
}
}
//创建JSON对象用于封装最终返回值数据信息
JSonObject result = new JSonObject();
result.put("database",db);
result.put("table",table);
result.put("data",dataAfter);
result.put("dataBefore",dataBefore);
result.put("operation",operation.toString().toLowerCase());//转成小写
collector.collect(result.toJSonString());
}
//定义返回类型
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(String.class);
}
}
insert类型:只有data
update类型:有before有after
delete类型:只有before
## 1.6 采用广播流做动态分流 动态分流另一种方案:flink 定时读取mysql,更新map,然后完成后续的建表插入数据的操作。 此处使用flinkCDC监听mysql binlog,形成一条数据流,这条流广播后和主流回合。(更优)



