1 熟悉内存管理布局,合理分配内存计算,可以适当调整各个内存比重 2 合理设置CPU资源 (slot隔离内存,但共享CPU,一个slot可能会同时运行多个task) 例子: 启动参数设置有4个容器,每个TM 2个slot,每个container 2个cpu core,并行度5。则5/2->需要启动3个TM+1个JobManger, 一个TM启动2个cpu,则2*3+1个JobManger CPU,共7个CPU。如果这个例子每个节点只用一个cpu则效率就会很差 3 合理设置并行度 压测初始并行度10以下,测试单个并行度的处理上限。然后并行度=QPS/单并行度处理能力 根据高峰期QPS压测,并行度1.2倍,富裕一些资源。kafka可以先积压数据,然后启动flink任务直接反压 source端并行度设为kafka 对应topic分区数。如果消费速度跟不上生产速度可以扩大kafka分区数,同时并行度对齐 transform端并行度 keyby之前的算子,一般不会做太多操作,如map/filter等,所以并行度可以和source端保持一致 keyby之后的算子,并行度如并发较大则设置为2的幂次方。小并发/大并发但无keyby则都可不设置为2的幂次方 sink端并行度 根据下游抗压能力评估,且根据sink段数据量大小判定 如下游为kafka,则并行度数量为kafka分区数量大状态调优
1 开启state性能监控,可以观察state延迟情况
2 开启RocksDB增量检查点 (内存不支持) checkpoint全量存储变成增量存储
3 RocksDB本地恢复(内存不支持) 由HDFS读取数据恢复改为直接读取本地节点数据
4 多目录到多磁盘,减小IO压力
后面备选
5 为RocksDB提供预定义的选项,如机械硬盘+内存模式 6 调整blockCache 所有rocksDB共享一个block cache,增加读缓存可以增加缓存命中几率,减小刷写磁盘频率。默认8mb,可改为64-256mb 7 调整writeBuffer和level阈值参数 rocksDB中每个state使用一个column family,列族的writeBuffer写缓存默认64mb,可以调大 每层缓存大小max-size-level-base也要相应调大,这样缓冲数据不会分层太多,默认256mb,阈值过小会导致层次过多,能存放的文件过小 8 增大每个column family对应的write buffer最大数量 可简单认为为内存中只读表的数量,也就是图上ReadOnly MemTable数量 9 增加后台线程数和合并线程数 用于图上后台Flush和sst文件的线程数,默认为1。可以调大,机械硬盘用户可改为4 10 增大writeBuffer最小合并数量 数据flush到磁盘时,需要合并的writeBuffer最小数量。默认1,可改为3 11 开启分区索引功能 内存只保存数据的多级索引的最上层,当需要时内存才记录后面层级索引。节省内存。适合内存较小的场景
checkpoint的设置
反压处理Credit-based反压机制
1.5 版本之前反压的实现通过TCP-based反压机制,1.5之后为Credit-based反压机制 Credit-based简单来说得优化就是接收方每次ack返回都会不断的主动告诉上游发送方我还能接收多少数据,不能再接收时则不再发送了 如下图InputChannel先接收,满了则Local BufferPool接收,满了则Network BufferPool接收。都满了则主动通知不能再发送了
危害
定位方式
1 利用WebUI定位 2 利用Metrics定位 Flink 1.9 及更高版本,inPoolUsage 还细分为 exclusiveBufferUsage(每个Channel 独占的 Buffer)和 floatingBufferUsage(按照 Channel 需求,动态分配和归还的 Buffer)
反压处理
1 数据倾斜 2 资源不够 3 GC 4 外部组件交互问题 5 代码问题,死循环等数据倾斜
keyby后的数据倾斜 第一种情况: 因为是流式计算,拆分任务如二次聚合等思路执行会导致最终结果会重复累加(非FlinkSql,未使用回撤流),不准确。 因为比如某节点处理一条数据后如sum计算为1,来第二条相同数据时因为是流式会保存上一个数据的状态后继续sum为2, 当最后节点全部汇总时sum为1+2=3,而不是我们期望的1+1=2,所以结果错误了 1 所以这种场景可以使用本地代码中提前批量预聚合 2 有直接的 FlinkSQL (开启攒批的 MiniBatch + 本地聚合和全局聚合两阶段聚合的 LocalGlobal,不考虑延迟情况可以无脑开启) 第二种情况: keyby后窗口聚合操作的数据倾斜 因为使用了窗口,所以数据变成了有界的批攒处理,所以这里可以使用二次聚合,一个窗口内数据做随机数打散再聚合 keyby前的数据倾斜 例子1:如在keyby之前,数据源kafka的不同分区数量就非常不平均 例子2:例如kafka3个分区,flink source并行度为1,就有一个任务要处理2个kafka分区数据 这种情况可以强制让flink任务进行shuffle,rebalance,rescale等算子重新打散数据Job优化
1 每个算子制定唯一用户 UUID,观察的时候方便算子之间的区分。默认是JobGraph自动生成 2 链路延迟测量 3 开启对象重用 类似浅拷贝,直接引用指向而不是新建对象 算子A数据对象同时传递给算子B和算子C,如果算子B对A数据做了修改,则C的数据也被同时修改了,但C不期望修改 因为是引用,所以会导致数据不安全和计算结果错误。使用场景需要注意 4 细粒度滑动窗口优化 (即滑动窗口远远大于滑动步长的情况,如窗口为1天数据,窗口步长仅1分钟,重叠窗口次数会过多,一个数据属于多个窗口,性能下降)FlinkSQL优化
TTL
MiniBatch
增加延迟来增强高吞吐
LocalGlobal 解决数据倾斜
必须同时开启MiniBatch+LocalGlobal,因为得先微批积攒后才能本地预聚合
1.9 版本之后 Count/Distinct 的自动打散
由常规的二次聚合方式改成自动的将需要distinct/count的数据进行hash打散 默认不开启,需要主动开启,默认hash打散mod1024
多维Distinct使用Filter
如下图。flink优化器识别后,filter可以共享状态实例,而不是三个状态实例,提高性能



