栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Expiring XXX record(s) for XXX:120015 ms has passed since batch creation

Expiring XXX record(s) for XXX:120015 ms has passed since batch creation

Expiring XXX record(s) for XXX:120015 ms has passed since batch creation
    问题背景:dws曝光人+场模型聚合压测,憋量20亿左右数据;问题发生现象:flink job启动后,频繁发生checkpoint失败,并且checkpoint失败原因 :Failure reason: Checkpoint was declined.问题现场日志:
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 8 for operator aggregate -> Sink: exp sink (86/160). Failure reason: Checkpoint was declined.at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434) ...
Caused by: org.apache.flink.util.SerializedThrowable: Failed to send data to Kafka: Expiring 2483 record(s) for 【topic_name】-85:120015 ms has passed since batch creation ...
Caused by: org.apache.flink.util.SerializedThrowable: Expiring 2483 record(s) for 【topic_name】-85:120015 ms has passed since batch creation ...
org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
    问题发生原因描述:
    问题的根本原因是kafka消息发送是批量发送,ProducerRecord会先存储到本地buffer,消息存储在这个buffer里的时长是有限制的【request.timeout.ms】,因此在消息量级比较大,存储在buffer里的消息,超过了request.timeout.ms这个设置时长,就会报上述Expiring XXX record(s) for XXX:120015 ms has passed since batch creation错误;而与此同时,我们开启了端到端的精准一次特性即事务,此时checkpoint与消息的pre commit绑定,pre commit 失败,导致checkpoint的失败,任务重启,大量消息积压;问题解决方案:
    a)调整 request.timeout.ms 这个参数去满足需求,让消息在buffer里待更长的时间;
    b)我们公司会给与每个生产者限速,可以提升生产者的速度,这样本地缓存的消息就不会产生积压;checkpoint失败现场截图,表现为某一个或者多个并行度checkpoint失败:
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/707563.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号