栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

基于Prometheus的flink性能监控小坑记录

基于Prometheus的flink性能监控小坑记录

背景

公司内的flink集群跑了挺长一段时间了,一直也没有对其进行一个比较完整的监控,最近打算着手做这件事情,经过网上的调研,目前公司采用的部署模式是per-job模式,最终选用了基于prometheus,把job指标推送到中间网关的pushgateway上面,然后prometheus去抓取pushgateway上面的信息,从而实现对flink做性能监控,最后通过Grafana进行展示。

问题

在接入过程中,发现了一个问题: 我感觉prometheus的机制有点蠢!!!
为啥这么说?
pushgateway会为每一个flink job的taskmanager和jobmanager推送的指标数据都分别创建一个group,prometheus每隔一个周期去pushgateway上拉取一批GROUP的最新数据。 看似没什么问题, 但当一个flink job重新被调度或者重启后,会在pushgateway生成一个新的job group,而旧的group永远不会被删除。
prometheus可不管你这个group是不是过期了的,它就是无脑定期去所有注册了的GROUP上去拉数据, 也就意味着那些过期的GROUP永远拉的都是最后一条重复数据。

当然我也在想,就算pushgateway不管理对过期group的清理,那么flink起码在job的结束期间,要做一个主动的删除group的操作吧?
进入PrometheusPushGatewayReporter的源码,发现,诶,还真有:

   @Override
    public void close() {
        if (deleteonShutdown && pushGateway != null) {
            try {
                pushGateway.delete(jobName, groupingKey);
            } catch (IOException e) {
                log.warn(
                        "Failed to delete metrics from PushGateway with jobName {}, groupingKey {}.",
                        jobName,
                        groupingKey,
                        e);
            }
        }
        super.close();
    }

但是,实际上,job结束时,这个方法并不会调用,不知道是不是flink的bug(我的flink版本是1.13)?

坑爹啊!!!

解决

可想而知,如果这么放油不管,group会膨胀到什么样子。。

于是就有了下面这段自动定时去清理的脚本:

trap 'echo "got sigterm" ; exit 0' SIGTERM

EXPIRATION_SECONDS=${EXPIRATION_SECONDS:-900}
PGW_URL=${PGW_URL:-http://192.168.1.1:9091}

#function convert_to_standardnotation(){
#    # convert number from scientific notation to standar d( ie  '1.5383780136826127e+09' )
#    printf '%.0f' $1
#}

function convert_to_standardnotation() {
    # convert number from scientific notation to standard( ie  '1.5383780136826127e+09' )
    echo $1 | awk '{printf("%.0f", $1)}'
}

function extract_pushgateway_variable(){
 local -r _METRIC=$1
 local -r _VARNAME=$2
 #echo 'push_time_seconds{instance="10.32.32.7",job="bk_jenkins"} 1.5383802210997093e+09' | sed -r 's/.*instance="([^"]*).*/1/g'
 echo $_METRIC | sed -r "s/.*${_VARNAME}="([^"]*).*/\1/g"
 # sample usage :
 # extract_pushgateway_variable 'push_time_seconds{instance="10.32.32.7",job="bk_jenkins"} 1.5383802210997093e+09' 'instance'
}

function check_metric_line(){
   local -r _line=$1
   METRIC_TIME=$(echo $_line | awk '{print $2}' )
   #echo "mtime = $_line -> $METRIC_TIME "
   METRIC_TIME=$(convert_to_standardnotation $METRIC_TIME)
   #echo "$CURRENT_TIME - $METRIC_TIME "
   METRIC_AGE_SECONDS=$((CURRENT_TIME-METRIC_TIME))

   if [ "$METRIC_AGE_SECONDS" -gt "$EXPIRATION_SECONDS" ]; then

    metricInstance=$(extract_pushgateway_variable "$_line" 'instance')
    metricJob=$(extract_pushgateway_variable "$_line" 'job')
    
    echo "[INFO] job should be deleted $metricJob  - $metricInstance  age: $METRIC_AGE_SECONDS "
  #  curl -s -X DELETE "$PGW_URL/metrics/job/${metricJob}/instance/${metricInstance}"
    curl -s -X DELETE "$PGW_URL/metrics/job/${metricJob}"
     fi


}


function check_expired_metric_loop(){

export CURRENT_TIME=$(date +%s)
METRICS_LIST=$(curl -s  $PGW_URL/metrics | egrep "^push_time_seconds")
echo "$METRICS_LIST" | while  read -r line || [[ -n "$line" ]]; do
   check_metric_line "$line"
done
sleep $((EXPIRATION_SEConDS / 3 ))

}
while : ; do
check_expired_metric_loop
done 

使用时只用替换PGW_URL地址即可

另外,这种另辟蹊径的方法其实也可以尝试,但是我嫌太麻烦了,就贴在这里记录一下吧:

https://blog.csdn.net/daijiguo/article/details/105453643

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/604278.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号