什么叫容错机制?
容错的含义:Flink程序挂了,重启故障恢复到挂掉之前的状态;从当前系统的状态存盘,用于Flink故障恢复容错机制的核心: 应用程序的状态的一致性检查点 1.一致性检查点(Checkpoints)
Flink想要做到精准一致性,就需要用到一致性检查点;一致性检查点就是checkpoint一致性检查点是flink程序故障恢复机制的核心;一致性检查点就是在某一个时间点,flink程序中所有并行度中状态的一份快照;通过将算子中的状态进行持久化,用于flink程序故障重启仍然能够无缝衔接,实现不丢失数据+不重复计算;checkPoint是JobManager每隔一段时间发出的任务,检查点所保存的数据应该是flink程序中,所有任务处理完同一条数据后这一时刻的所有算子的状态;
图解一致性检查点
如图所示,数据流是:1 2 3 4 5 6 7 ……
假设当前checkpoint的设定的是处理完5后,将所有算子状态进行保存:
source处理完5后,source的状态保存为5sum_even处理完5后,状态保存为6sum_odd处理完5后,状态保存为9
可以知道,流处理过程中,数据流到算子是有前后时间顺序的,假设上游算子A处理完5后保存了状态,下游算子B还未处理5,此时job挂掉了,那么整个Checkpoint就是失败的,而checkpoint过程也是一个事务,如果ck失败就会回滚;
在代码中开启检查点
flink中默认是关闭检查点的CheckPoint是JobManager发出的一个任务,每隔一段固定的时间做一次CheckPoint,保存各个Task的状态开启检查点的目的就是为了故障恢复,检查点会存储状态,如果不需要故障恢复就不需要做检查点。 2.从检查点恢复状态
(1)假设在处理7的时候,sum_odd任务挂掉了
(2)从检查点恢复状态
source可以恢复偏移量,上游重新消费;
从检查点恢复状态总结:
1.当flink遇到故障,首先就是重启应用,重启应用的时候,所有的Task的状态会重置为初始值;
2.然后Flink会使用最近的检查点来恢复应用程序中各个Task的状态,重新启动处理流程。JobManager中会记录应用程序的拓扑结构,读取状态,恢复每个Task的状态,Source要根据自己保存的状态重新提交偏移量,从偏移量后面重新读取数据,保证了数据不丢失。
3. source开始消费,应用程序开始处理
一致性检查点:所有任务处理完同一条数据后这一时刻的所有算子的状态;
如何实现所有任务处理完同一条数据后保存状态?
Flink检查点算法的正式名称是异步分界线快照(asynchronous barrier snapshotting)。该算法大致基于Chandy-Lamport分布式快照算法。检查点是Flink最有价值的创新之一,因为它使Flink可以保证exactly-once,并且不需要牺牲性能。
3.1 检查点分界线:barrier
flink的检查点算法用到了一种称为分界线的特殊数据;分界线是有JobManager发起checkpoint命令的时候,往source中插入的一条特殊数据,可以将流上的数据按照不同的检查点分开;当subTask遇到barrier的时候,就会将当前状态保存barrier和waterMark一样是广播传输的
如果source是多并行度的,jobManager会往每一个并行度注入一个barrier;对于下游算子来说,必须接收到两个barrier之后才能做checkpoint,这叫做barrier对齐算法;
3.2 barrier对齐 - 精准一次
精准一次指的是每条数据只被处理一次;barrier的对齐机制,是保证精准一次的必要条件;到达了,就缓存起来不处理,没到达就继续处理;
sink收到barrier的时候会向JobManager通知checkpoint完成;当所有的sink都向JM通知后,才能算当前checkpoint完成了; 3.3 barrier非对齐 - 至少一次
barrier对齐的缺点: 对于barrier已经到达的分区,继续到达的数据会被缓存,这样的话假如另一个分区的barrier迟迟不来,会导致内存膨胀;flink自带反压机制,当内存不足的时候,会通知前面不要再接收数据了。
flink11中引入了非barrier对齐: 对于barrier已经到达的分区,继续到达的数据会被正常计算处理;这就会导致如果任务挂了,这些barrier后的数据会被重新计算一次,这就导致了at least onece
4.保存点(SavePoint)
(1)Save point和Check point的算法是一样的,用的是barrier对齐算法;而且save point可以认为是比check point多一些额外元数据的检查点。
(2)save point 与 check point 的作用不同
checkPoint用于task故障自动重启:某个task挂了,会自动重启task;如果手动cancel job,checkPoint目录也会被删掉。savePoint用于给Job做备份,如果想要手动cancel job,就需要使用savePoint
(3)save point 与check point的启动不同
checkpoint是flink自动做的,检查点是jobManager自动发出的,会定时发出checkPoint任务进行存盘savepoint是手动做的;
(4)实际开发中checkPoint和savePoint都需要做
比如要修改已经上线的流计算应用,比如某个地方空指针,增加校验逻辑,只要不修改状态的逻辑,那么可以将程序从保存点启动。
如何从sp或者cp恢复任务? Checkpoint演示
(1) IDEA中开发wordcount程序,并且开启checkpoint
(2)启动程序
由于在windows IDEA下开发的,因此需要导入hdfs的依赖;
直接操作hdfs会有权限问题,hdfs目录文件有用户组的;用hdfs命令修改ck目录权限即可;
执行代码,在端口输入 hello world,会打印hello 1 word 1
(3)查看checkpoint目录文件
在代码中设置的目录是存放Job的目录;Job启动后,会在此目录下创建一个以JobID命名的目录;也就是说一个Job对应一个CheckPoint目录;
在JobID目录下,有三个目录:
chk-7 :此目录后缀每隔一段时间更新一次;时间间隔是代码中设定的;sharedtaskowned
(3) 关闭flink程序,然后重新启动,再次输入hello word,会发现是重新开始计算的,因为没有用特定的方式恢复
savePoint使用演示
1.第一次打包提交job
或者命令行提交:
2. 输入hello word,让程序执行
3.手动设置savepoint,保存状态
4.查看savepoint 文件
5.在输入一些数据,让程序往前推进
6.手动cancel 任务
手动cancel任务会发现checkpoint目录被删除,因为ck的作用是任务挂了自己重启使用的;如果正常cancel任务,就会把ck删除;所以手动cancel不能够从ck恢复如果从IDEA 中终止任务,不会删除ck,是因为这种场景下相当于遇到了异常,还没来得及删除JVM就停了
7.从savepoint恢复任务
8 再次输入一些数据,会从保存点的结果状态继续执行
(1) 设置cancel任务不删除
(2)



