栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

一次数据库扩容导致FlinkCDC作业挂掉引发的思考

一次数据库扩容导致FlinkCDC作业挂掉引发的思考

查看yarn日志发现报下面的错

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

谷歌了一下 发现说是 导致数据源不太稳定 导致的,同时建议把开启自动重启功能,这里我看了下线上的代码,

发现重启策略配置的是 失败不重启,本以为很离谱,但是后来问了才发现说 原来任务是1.5版本起的,

在没有完全全量阶段的数据的ck之前,重启会锁表,导致业务出现问题,所以这里配置的不重启,
但目前基本都是使用的是2.0,所以重新配置了策略为(重启的次数+固定间隔:重启3次,每次间隔10s 或者 失败率:5分钟内失败3次,每次间隔10s)

解决:还是重新调整了重启策略,等扩容结束,作业启动OK。

State:

接受数据— task — 输出

	1.获取状态
	2.更新状态

数据源可以回放
Checkout State 存储位置能持久化(State Backend保存点:内存 外部存储 RocksDB)

所以为啥需要State?
后面批次的计算结果 是依赖于 前面批次计算结果的 ,这就是有状态 。比如wordcount
这个时候我们就需要把每次的计算结果(状态)保存到 某一个地方

当我们的程序挂掉之后,你想重启,从上次挂掉的地方开始消费
22点挂了 offset
重启之后2150开始 重复消费
重启之后2205开始 漏数据

怎么知道程序挂掉的时候,你消费到了那个位置呢?这个就是offset
Flink这个框架会把你程序挂掉时候,已经消费的offset给存下来
当你重启的时候,会从该保存的offset开始重新消费
这里的flink offset 就是作为一个状态state 进行管理的 ,不需要手工管理,使用Flink提供的配置即可
即:Flink 定时的周期性的把我们的 state 存储到某一个地方去

State类型:
Keyed State  : If you want to use keyed state, you first need to specify a key on a DataStream
	ValueState  : 数据加进去  获取  更新  
	ListState : 数据加进去  获取  更新  
	ReducingState : 
	AggregatingState :单个值的聚合,传入一个,输出一个
	MapState : 维护了一个map的映射  

这些状态的对象 仅仅和state 进行交互 ,可以存储到内存和hdfs等外部存储 

Operator State :和并行度相关,消费Kafka的数据 对于Flink而言消费的位置是保存在  Operator State  里面的 ,所以也是很重要  
	Kafka中指定Topic的partition及groupid对应offset
	Broadcast State:Operator State 的一种特例

开启Checkpoint
设置重启策略
设置StateBackend

补:
1.原始日志数据清洗(思考有哪些处理3种以上)
2.清洗之后进行各种维度数据统计
(1)不带窗口
(2)Window+Watermaker 1.sum 的方式拿不到开始时间 结束时间的 2.状态编程
3.数据Sink(MySQL Redis)

1.Flink布隆过滤器
2.Java Http请求并处理返回值(类似Python requests)
3.Flink RichMapFunction 的理解(可以获取类的生命周期open()方法)

4.状态编程 cleanStream.keyBy(x -> x.deviceType).process(new KeyedProcessFunction() {}
4.1 原来是根据数据中的某个字段
4.2 现在我们是根据每个device来进行判断是否是新老用户
来一条数据可以把设备ID存储到 Flink state 里面,然后后面的数据和这里存储的数据做一个对比即可
实现: 状态 + 布隆过滤器
大致处理逻辑:要使用状态 首先需要调用生命周期方法 ,先得到一个上下文的state (这里又先要构造一个描述器)

Checkpoint 调优点:

状态后端:基于内存 文件系统 rocksdb 三类

checkpoint 除了存元数据还有各个算子的结果数据状态
rocksdb:Flink内嵌数据库,用在管理内存上 ,由flink帮我们管理
先写内存 再刷写到磁盘

调优点:如果你的中间状态数据量大那么 管理内存就要设置大一点 减少刷写到磁盘次数

1.开启 state 状态性能监控 配置
可以在提交命令中加参数 1.13 之后才有
瓶颈在读,webui可以看到读状态的消耗时间

调优点:webui:里面代表的是每次checkpoint的数据量,不是累积

2.开启 增量检查点 *****
liststate 1,2,3 做ck ,
对于增量的 4,5,6就可以做增量,不用每次间隔做ck都做全量的
开启的时候设置为true

调优点:不要每次都是存状态全量,而是做增量

3.开启 本地状态恢复
ck数据还没落到hdfs,直接从本地内存中读取

4.设置rocksdb存储到多块磁盘上的多目录️

反压浅析:

原理 定位 解决
1.flink拓扑图是一个pipiline
flink 每个task之间 (算子之间)都是以阻塞队列的方式传输,当下游 来不及消费队列 导致被占满之后,上游的生产也会被阻塞
发送端 输出缓冲区
接收端 输入缓冲区

什么场景会发生:短时间的数据高峰
会影响什么:首先就是checkpoint时间变长 stat变大

2.怎么来定位:自带的webui
繁忙程度,颜色不同,一般蓝色,灰色一般就是反压,一般是第一个ok的算子

数据膨胀:flatmap 一进多出 ,数据膨胀
打散操作链,可以看到具体哪个算子导致的

3.原因
数据倾斜
资源不够
gc 老年代一般不会gc,频繁full gc 还一直回收不掉
sink到下游 :考虑下游能不能处理得过来

数据倾斜:可以ui上看到反压算子的subtask接收发送的数据量大小

gc:
开启火焰图
提交命令中指定打印gc日志,重点关注老年代剩余大小
full gc 大小m
老年代2-3 倍m ,新生代 1-1.5倍,整体3-4倍

外部系统交互:
source端读取数据性能低或者sink写入性能低
维表join时候的性能,增大缓存条数

demo:

      //获取flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
        env.enableCheckpointing(1000);
        // 间隔10秒 重启3次
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,Time.seconds(10)));
        //5分钟内若失败了3次则认为该job失败,重试间隔为10s
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5,TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS)));
        //不重试
        env.setRestartStrategy(RestartStrategies.noRestart());
        // 是否保留 : 作业失败是否保留已有的checkpoint
        // 设置StateBackend : env.setStateBackend()
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752101.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号