查看yarn日志发现报下面的错
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy谷歌了一下 发现说是 导致数据源不太稳定 导致的,同时建议把开启自动重启功能,这里我看了下线上的代码,
发现重启策略配置的是 失败不重启,本以为很离谱,但是后来问了才发现说 原来任务是1.5版本起的,
在没有完全全量阶段的数据的ck之前,重启会锁表,导致业务出现问题,所以这里配置的不重启,
但目前基本都是使用的是2.0,所以重新配置了策略为(重启的次数+固定间隔:重启3次,每次间隔10s 或者 失败率:5分钟内失败3次,每次间隔10s)
解决:还是重新调整了重启策略,等扩容结束,作业启动OK。
State:接受数据— task — 输出
1.获取状态 2.更新状态
数据源可以回放
Checkout State 存储位置能持久化(State Backend保存点:内存 外部存储 RocksDB)
所以为啥需要State?
后面批次的计算结果 是依赖于 前面批次计算结果的 ,这就是有状态 。比如wordcount
这个时候我们就需要把每次的计算结果(状态)保存到 某一个地方
当我们的程序挂掉之后,你想重启,从上次挂掉的地方开始消费
22点挂了 offset
重启之后2150开始 重复消费
重启之后2205开始 漏数据
怎么知道程序挂掉的时候,你消费到了那个位置呢?这个就是offset
Flink这个框架会把你程序挂掉时候,已经消费的offset给存下来
当你重启的时候,会从该保存的offset开始重新消费
这里的flink offset 就是作为一个状态state 进行管理的 ,不需要手工管理,使用Flink提供的配置即可
即:Flink 定时的周期性的把我们的 state 存储到某一个地方去
Keyed State : If you want to use keyed state, you first need to specify a key on a DataStream ValueState : 数据加进去 获取 更新 ListState : 数据加进去 获取 更新 ReducingState : AggregatingState :单个值的聚合,传入一个,输出一个 MapState : 维护了一个map的映射 这些状态的对象 仅仅和state 进行交互 ,可以存储到内存和hdfs等外部存储 Operator State :和并行度相关,消费Kafka的数据 对于Flink而言消费的位置是保存在 Operator State 里面的 ,所以也是很重要 Kafka中指定Topic的partition及groupid对应offset Broadcast State:Operator State 的一种特例
开启Checkpoint
设置重启策略
设置StateBackend
补:
1.原始日志数据清洗(思考有哪些处理3种以上)
2.清洗之后进行各种维度数据统计
(1)不带窗口
(2)Window+Watermaker 1.sum 的方式拿不到开始时间 结束时间的 2.状态编程
3.数据Sink(MySQL Redis)
1.Flink布隆过滤器
2.Java Http请求并处理返回值(类似Python requests)
3.Flink RichMapFunction 的理解(可以获取类的生命周期open()方法)
4.状态编程 cleanStream.keyBy(x -> x.deviceType).process(new KeyedProcessFunction
4.1 原来是根据数据中的某个字段
4.2 现在我们是根据每个device来进行判断是否是新老用户
来一条数据可以把设备ID存储到 Flink state 里面,然后后面的数据和这里存储的数据做一个对比即可
实现: 状态 + 布隆过滤器
大致处理逻辑:要使用状态 首先需要调用生命周期方法 ,先得到一个上下文的state (这里又先要构造一个描述器)
状态后端:基于内存 文件系统 rocksdb 三类
checkpoint 除了存元数据还有各个算子的结果数据状态
rocksdb:Flink内嵌数据库,用在管理内存上 ,由flink帮我们管理
先写内存 再刷写到磁盘
调优点:如果你的中间状态数据量大那么 管理内存就要设置大一点 减少刷写到磁盘次数
1.开启 state 状态性能监控 配置
可以在提交命令中加参数 1.13 之后才有
瓶颈在读,webui可以看到读状态的消耗时间
调优点:webui:里面代表的是每次checkpoint的数据量,不是累积
2.开启 增量检查点 *****
liststate 1,2,3 做ck ,
对于增量的 4,5,6就可以做增量,不用每次间隔做ck都做全量的
开启的时候设置为true
调优点:不要每次都是存状态全量,而是做增量
3.开启 本地状态恢复
ck数据还没落到hdfs,直接从本地内存中读取
4.设置rocksdb存储到多块磁盘上的多目录️
反压浅析:原理 定位 解决
1.flink拓扑图是一个pipiline
flink 每个task之间 (算子之间)都是以阻塞队列的方式传输,当下游 来不及消费队列 导致被占满之后,上游的生产也会被阻塞
发送端 输出缓冲区
接收端 输入缓冲区
什么场景会发生:短时间的数据高峰
会影响什么:首先就是checkpoint时间变长 stat变大
2.怎么来定位:自带的webui
繁忙程度,颜色不同,一般蓝色,灰色一般就是反压,一般是第一个ok的算子
数据膨胀:flatmap 一进多出 ,数据膨胀
打散操作链,可以看到具体哪个算子导致的
3.原因
数据倾斜
资源不够
gc 老年代一般不会gc,频繁full gc 还一直回收不掉
sink到下游 :考虑下游能不能处理得过来
数据倾斜:可以ui上看到反压算子的subtask接收发送的数据量大小
gc:
开启火焰图
提交命令中指定打印gc日志,重点关注老年代剩余大小
full gc 大小m
老年代2-3 倍m ,新生代 1-1.5倍,整体3-4倍
外部系统交互:
source端读取数据性能低或者sink写入性能低
维表join时候的性能,增大缓存条数
demo:
//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000);
// 间隔10秒 重启3次
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,Time.seconds(10)));
//5分钟内若失败了3次则认为该job失败,重试间隔为10s
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5,TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS)));
//不重试
env.setRestartStrategy(RestartStrategies.noRestart());
// 是否保留 : 作业失败是否保留已有的checkpoint
// 设置StateBackend : env.setStateBackend()



