栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink自定义指标发送到pushgateway写入prometheus

flink自定义指标发送到pushgateway写入prometheus

flink官方提供了写metrics的方式,但是相对来说有些不灵活,不符合我当前的要求,也没法自定义动态的label值,于是自定义了sink写入到pushgateway里。
代码如下:

  class MyPushGateWaySink(pushgatewayipport:String) extends RichSinkFunction[(String,String,String)] {

    var pushgateway:PushGateway = _
    var gauge:Gauge = Gauge.build
      .name("name")
      .help("help")
      .labelNames("label1","label2", "label3").register
    // open函数用于初始化富函数运行时的上下文等环境
    override def open(parameters: Configuration): Unit = {
      println("----------------------------初始化连接-------------------------")
      super.open(parameters)
      pushgateway= new PushGateway(pushgatewayipport)
    }
    override def invoke(value: (String,String,String), context: SinkFunction.Context): Unit = {
      val timestamp=System.currentTimeMillis()
      gauge.labels(value._1,value._2,value._3).set(timestamp)
      pushgateway.push(gauge,"name")
    }
    // 关闭时做清理工作
    override def close(): Unit = {
      println("-----------------------关闭连接-----------------------")
    }
  }
   kafkaStream.addSink(new MyPushGateWaySink(pushgatewayipport))

所需依赖如下:


    io.dropwizard.metrics
    metrics-core
    4.1.16



    io.prometheus
    simpleclient
    0.9.0



    io.prometheus
    simpleclient_httpserver
    0.9.0



    io.prometheus
    simpleclient_pushgateway
    0.9.0

但是运行一直报无法序列化问题,于是修改源码,添加序列化
因为使用的是Gauge类型,所以对该类添加序列化,如下图

同时父类也添加序列化

两个类的路径如下

再次运行,数据成功发送到pushgateway

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/683263.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号