公司内的flink集群跑了挺长一段时间了,一直也没有对其进行一个比较完整的监控,最近打算着手做这件事情,经过网上的调研,目前公司采用的部署模式是per-job模式,最终选用了基于prometheus,把job指标推送到中间网关的pushgateway上面,然后prometheus去抓取pushgateway上面的信息,从而实现对flink做性能监控,最后通过Grafana进行展示。
问题在接入过程中,发现了一个问题: 我感觉prometheus的机制有点蠢!!!
为啥这么说?
pushgateway会为每一个flink job的taskmanager和jobmanager推送的指标数据都分别创建一个group,prometheus每隔一个周期去pushgateway上拉取一批GROUP的最新数据。 看似没什么问题, 但当一个flink job重新被调度或者重启后,会在pushgateway生成一个新的job group,而旧的group永远不会被删除。
prometheus可不管你这个group是不是过期了的,它就是无脑定期去所有注册了的GROUP上去拉数据, 也就意味着那些过期的GROUP永远拉的都是最后一条重复数据。
当然我也在想,就算pushgateway不管理对过期group的清理,那么flink起码在job的结束期间,要做一个主动的删除group的操作吧?
进入PrometheusPushGatewayReporter的源码,发现,诶,还真有:
@Override
public void close() {
if (deleteonShutdown && pushGateway != null) {
try {
pushGateway.delete(jobName, groupingKey);
} catch (IOException e) {
log.warn(
"Failed to delete metrics from PushGateway with jobName {}, groupingKey {}.",
jobName,
groupingKey,
e);
}
}
super.close();
}
但是,实际上,job结束时,这个方法并不会调用,不知道是不是flink的bug(我的flink版本是1.13)?
坑爹啊!!!
解决可想而知,如果这么放油不管,group会膨胀到什么样子。。
于是就有了下面这段自动定时去清理的脚本:
trap 'echo "got sigterm" ; exit 0' SIGTERM
EXPIRATION_SECONDS=${EXPIRATION_SECONDS:-900}
PGW_URL=${PGW_URL:-http://192.168.1.1:9091}
#function convert_to_standardnotation(){
# # convert number from scientific notation to standar d( ie '1.5383780136826127e+09' )
# printf '%.0f' $1
#}
function convert_to_standardnotation() {
# convert number from scientific notation to standard( ie '1.5383780136826127e+09' )
echo $1 | awk '{printf("%.0f", $1)}'
}
function extract_pushgateway_variable(){
local -r _METRIC=$1
local -r _VARNAME=$2
#echo 'push_time_seconds{instance="10.32.32.7",job="bk_jenkins"} 1.5383802210997093e+09' | sed -r 's/.*instance="([^"]*).*/1/g'
echo $_METRIC | sed -r "s/.*${_VARNAME}="([^"]*).*/\1/g"
# sample usage :
# extract_pushgateway_variable 'push_time_seconds{instance="10.32.32.7",job="bk_jenkins"} 1.5383802210997093e+09' 'instance'
}
function check_metric_line(){
local -r _line=$1
METRIC_TIME=$(echo $_line | awk '{print $2}' )
#echo "mtime = $_line -> $METRIC_TIME "
METRIC_TIME=$(convert_to_standardnotation $METRIC_TIME)
#echo "$CURRENT_TIME - $METRIC_TIME "
METRIC_AGE_SECONDS=$((CURRENT_TIME-METRIC_TIME))
if [ "$METRIC_AGE_SECONDS" -gt "$EXPIRATION_SECONDS" ]; then
metricInstance=$(extract_pushgateway_variable "$_line" 'instance')
metricJob=$(extract_pushgateway_variable "$_line" 'job')
echo "[INFO] job should be deleted $metricJob - $metricInstance age: $METRIC_AGE_SECONDS "
# curl -s -X DELETE "$PGW_URL/metrics/job/${metricJob}/instance/${metricInstance}"
curl -s -X DELETE "$PGW_URL/metrics/job/${metricJob}"
fi
}
function check_expired_metric_loop(){
export CURRENT_TIME=$(date +%s)
METRICS_LIST=$(curl -s $PGW_URL/metrics | egrep "^push_time_seconds")
echo "$METRICS_LIST" | while read -r line || [[ -n "$line" ]]; do
check_metric_line "$line"
done
sleep $((EXPIRATION_SEConDS / 3 ))
}
while : ; do
check_expired_metric_loop
done
使用时只用替换PGW_URL地址即可
另外,这种另辟蹊径的方法其实也可以尝试,但是我嫌太麻烦了,就贴在这里记录一下吧:
https://blog.csdn.net/daijiguo/article/details/105453643



