栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink教程(27)- Flink Metrics监控

Flink教程(27)- Flink Metrics监控

文章目录

01 引言02 Metrics概述

2.1 Metrics介绍2.2 Metrics 类型 03 WebUI监控

3.1 自定义监控指标 04 REST API监控

4.1 http请求获取监控数据

4.1.1 获取flink任务运行状态4.1.2 获取 job 详情 4.2 开发者模式获取指标url4.3 代码中Flink任务运行状态 05 文末

01 引言

在前面的博客,我们学习了Flink的多语言开发了,有兴趣的同学可以参阅下:

《Flink教程(01)- Flink知识图谱》《Flink教程(02)- Flink入门》《Flink教程(03)- Flink环境搭建》《Flink教程(04)- Flink入门案例》《Flink教程(05)- Flink原理简单分析》《Flink教程(06)- Flink批流一体API(Source示例)》《Flink教程(07)- Flink批流一体API(Transformation示例)》《Flink教程(08)- Flink批流一体API(Sink示例)》《Flink教程(09)- Flink批流一体API(Connectors示例)》《Flink教程(10)- Flink批流一体API(其它)》《Flink教程(11)- Flink高级API(Window)》《Flink教程(12)- Flink高级API(Time与Watermaker)》《Flink教程(13)- Flink高级API(状态管理)》《Flink教程(14)- Flink高级API(容错机制)》《Flink教程(15)- Flink高级API(并行度)》《Flink教程(16)- Flink Table与SQL》《Flink教程(17)- Flink Table与SQL(案例与SQL算子)》《Flink教程(18)- Flink阶段总结》《Flink教程(19)- Flink高级特性(BroadcastState)》《Flink教程(20)- Flink高级特性(双流Join)》《Flink教程(21)- Flink高级特性(End-to-End Exactly-Once)》《Flink教程(22)- Flink高级特性(异步IO)》《Flink教程(23)- Flink高级特性(Streaming File Sink)》《Flink教程(24)- Flink高级特性(File Sink)》《Flink教程(25)- Flink高级特性(FlinkSQL整合Hive)》《Flink教程(26)- Flink多语言开发》

本文主要讲解Flink的Metrics监控。

02 Metrics概述

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html

2.1 Metrics介绍

由于集群运行后很难发现内部的实际状况,跑得慢或快,是否异常等,开发人员无法实时查看所有的 Task 日志,比如作业很大或者有很多作业的情况下,该如何处理?此时 Metrics 可以很好的帮助开发人员了解作业的当前状况

Flink 提供的Metrics可以在 Flink内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。

2.2 Metrics 类型

Metrics 的类型如下:

类型解析
Counter写过 mapreduce 作业的开发人员就应该很熟悉 Counter,其实含义都是一样的,就是对一个计数器进行累加,即对于多条数据和多兆数据一直往上加的过程
GaugeGauge 是最简单的Metrics,它反映一个值。比如要看现在Java heap 内存用了多少,就可以每次实时的暴露一个 Gauge,Gauge当前的值就是heap使用的量
MeterMeter是指统计吞吐量和单位时间内发生“事件”的次数。它相当于求一种速率,即事件次数除以使用的时间
HistogramHistogram 比较复杂,也并不常用,Histogram 用于统计一些数据的分布,比如说 Quantile、Mean、StdDev、Max、Min 等

Metric 在Flink 内部有多层结构,以 Group 的方式组织,它并不是一个扁平化的结构,Metric Group + Metric Name 是 Metrics 的唯一标识。

03 WebUI监控

在flink的UI的界面上点击任务详情,然后点击Task Metrics会弹出如下的界面,在 add metic按钮上可以添加我需要的监控指标。

3.1 自定义监控指标

○ 案例:在map算子内计算输入的总数据

○ 设置MetricGroup为:flink_test_metric

○ 指标变量为:mapDataNub

参考代码:

public class WordCount5_Metrics {
    public static void main(String[] args) throws Exception {
        //1.准备环境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2.准备数据-source
        //2.source
        DataStream linesDS = env.socketTextStream("node1", 9999);
        //3.处理数据-transformation
        DataStream wordsDS = linesDS.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                //value就是一行行的数据
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);//将切割处理的一个个的单词收集起来并返回
                }
            }
        });
        //3.2对集合中的每个单词记为1
        DataStream> wordAndonesDS = wordsDS.map(new RichMapFunction>() {
            Counter myCounter;
            @Override
            public void open(Configuration parameters) throws Exception {
                myCounter= getRuntimeContext().getMetricGroup().addGroup("myGroup").counter("myCounter");
            }

            @Override
            public Tuple2 map(String value) throws Exception {
                myCounter.inc();
                //value就是进来一个个的单词
                return Tuple2.of(value, 1);
            }
        });
        //3.3对数据按照单词(key)进行分组
        KeyedStream, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
        //3.4对各个组内的数据按照数量(value)进行聚合就是求sum
        DataStream> result = groupedDS.sum(1);

        //4.输出结果-sink
        result.print().name("mySink");

        //5.触发执行-execute
        env.execute();
    }
}
// /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
// /export/server/flink/bin/flink run --class cn.itcast.hello.WordCount5_Metrics /root/metrics.jar
// 查看WebUI

程序启动之后就可以在任务的ui界面上查看:

04 REST API监控

前面介绍了flink公共的监控指标以及如何自定义监控指标,那么实际开发flink任务我们需要及时知道这些监控指标的数据,去获取程序的健康值以及状态。

这时候就需要我们通过flink REST API,自己编写监控程序去获取这些指标。很简单,当我们知道每个指标请求的URL,我们便可以编写程序通过http请求获取指标的监控数据。

对于 flink on yarn 模式来说,则需要知道RM代理的 JobManager UI地址
格式: http://Yarn-WebUI-host:port/proxy/application_id

如:http://node1:8088/proxy/application_1609508087977_0004/jobs

4.1 http请求获取监控数据 4.1.1 获取flink任务运行状态

我们可以在浏览器进行测试,输入如下的连接:http://node1:8088/proxy/application_1609508087977_0004/jobs

返回信息:

{
    jobs: [{
            id: "ce793f18efab10127f0626a37ff4b4d4",
            status: "RUNNING"
        }
    ]
}
4.1.2 获取 job 详情

请求地址:http://node1:8088/proxy/application_1609508087977_0004/jobs/925224169036ef3f03a8d7fe9605b4ef

返回结果:

{
    jid: "ce793f18efab10127f0626a37ff4b4d4",
    name: "Test",
    isStoppable: false,
    state: "RUNNING",
    start - time: 1551577191874,
    end - time: -1,
    duration: 295120489,
    now: 1551872312363,
    。。。。。。
      此处省略n行
    。。。。。。
            }, {
                id: "cbc357ccb763df2852fee8c4fc7d55f2",
                parallelism: 12,
                operator: "",
                operator_strategy: "",
                description: "Source: Custom Source -> Flat Map",
                optimizer_properties: {}
            }
        ]
    }
}
4.2 开发者模式获取指标url

指标非常多,不需要记住每个指标的请求的URL格式?可以进入flink任务的UI界面,按住F12进入开发者模式,然后我们点击任意一个metric指标,便能立即看到每个指标的请求的URL。

比如获取flink任务的背压情况:
如下图我们点击某一个task的status,按一下f12,便看到了backpressue,点开backpressue就是获取任务背压情况的连接如下:http://node1:8088/proxy/application_1609508087977_0004/jobs/925224169036ef3f03a8d7fe9605b4ef/vertices/cbc357ccb763df2852fee8c4fc7d55f2/backpressure
请求连接返回的json字符串如下:我们可以获取每一个分区的背压情况,如果不是OK状态便可以进行任务报警,其他的指标获取监控值都可以这样获取 简单而又便捷。

4.3 代码中Flink任务运行状态

使用 flink REST API的方式,通过http请求实时获取flink任务状态,不是RUNNING状态则进行短信、电话或邮件报警,达到实时监控的效果。

public class MetricsTest {
    public static void main(String[] args) {
        String result = sendGet("http://node1:8088/proxy/application_1609508087977_0004/jobs");
        System.out.println(result);
    }

    public static String sendGet(String url) {
        String result = "";
        BufferedReader in = null;
        try {
            String urlNameString = url;
            URL realUrl = new URL(urlNameString);
            URLConnection connection = realUrl.openConnection();
            // 设置通用的请求属性
            connection.setRequestProperty("accept", "*/*");
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
            // 建立实际的连接
            connection.connect();
            in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) {
                result += line;
            }
        } catch (Exception e) {
            System.out.println("发送GET请求出现异常!" + e);
            e.printStackTrace();
        }
        // 使用finally块来关闭输入流
        finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        return result;
    }

}
05 文末

本文主要讲解Flink的监控,谢谢大家的阅读,本文完!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/760887.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号