栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark Structured Streaming Kafka offset提交监控

Spark Structured Streaming Kafka offset提交监控

StreamingQueryListener
StreamingQueryListener,即监听StreamingQuery各种事件的接口,如下:
 

abstract class StreamingQueryListener {

  import StreamingQueryListener._

  // 查询开始时调用
  def onQueryStarted(event: QueryStartedEvent): Unit

  // 查询过程中状态发生更新时调用
  def onQueryProgress(event: QueryProgressEvent): Unit

  // 查询结束时调用
  def onQueryTerminated(event: QueryTerminatedEvent): Unit
}


在QueryProgressEvent中,我们是可以拿到每个Source消费的Offset的。因此,基于StreamingQueryListener,可以将消费的offset的提交到kafka集群,进而实现对Kafka Lag的监控。

基于StreamingQueryListener向Kafka提交Offset
监控Kafka Lag的关键是能够向Kafka集群提交消费的Offset,以下示例演示了如何通过StreamingQueryListener向Kafka提交Offset。

KafkaOffsetCommiter

package com.bigdata.structured.streaming.monitor

import java.util
import java.util.Properties

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.clients.consumer.OffsetAndmetadata
import org.apache.kafka.common.TopicPartition

import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.slf4j.LoggerFactory




class KafkaOffsetCommiter(brokers: String, group: String) extends StreamingQueryListener {

  val logger = LoggerFactory.getLogger(this.getClass)

  // Kafka配置
  val properties= new Properties()
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, group)
  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  val kafkaConsumer = new KafkaConsumer[String, String](properties)

  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}

  // 提交Offset
  def onQueryProgress(event: QueryProgressEvent): Unit = {

    // 遍历所有Source
    event.progress.sources.foreach(source=>{

      val objectMapper = new ObjectMapper()
        .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        .configure(DeserializationFeature.USE_LONG_FOR_INTS, true)
        .registerModule(DefaultScalaModule)

      val endOffset = objectMapper.readValue(source.endOffset,classOf[Map[String, Map[String, Long]]])

      // 遍历Source中的每个Topic
      for((topic,topicEndOffset) <- endOffset){
        val topicPartitionsOffset = new util.HashMap[TopicPartition, OffsetAndmetadata]()

        //遍历Topic中的每个Partition
        for ((partition,offset) <- topicEndOffset) {
          val topicPartition = new TopicPartition(topic, partition.toInt)
          val offsetAndmetadata = new OffsetAndmetadata(offset)
          topicPartitionsOffset.put(topicPartition,offsetAndmetadata)
        }

        logger.warn(s"提交偏移量... Topic: $topic Group: $group Offset: $topicEndOffset")
        kafkaConsumer.commitSync(topicPartitionsOffset)
      }
    })
  }
}


Structured Streaming App

package com.bigdata.structured.streaming.monitor

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}


object ReadKafkaApp {
  def main(args: Array[String]): Unit = {

    val kafkaBrokers="kafka01:9092,kafka02:9092,kafka03:9092"
    val kafkaGroup="read_kafka_c2"
    val kafkaTopics1="topic_1,test_2"
    val kafkaTopics2="test_3"
    val checkpointDir="/Users/wangpei/data/apps/read_kafka/checkpoint/"
    val queryName="read_kafka"

    val spark = SparkSession.builder().master("local[3]").appName(this.getClass.getSimpleName.replace("$","")).getOrCreate()
    import spark.implicits._

    // 添加监听器
    val kafkaOffsetCommiter = new KafkaOffsetCommiter(kafkaBrokers,kafkaGroup)
    spark.streams.addListener(kafkaOffsetCommiter)

    // Kafka数据源1
    val inputTable1=spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",kafkaBrokers )
      .option("subscribe",kafkaTopics1)
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
      .select($"value")

    // Kafka数据源2
    val inputTable2=spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",kafkaBrokers )
      .option("subscribe",kafkaTopics2)
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
      .select($"value")

    // 结果表
    val resultTable = inputTable1.union(inputTable2)

    // 启动Query
    val query: StreamingQuery =resultTable
      .writeStream
      .format("console")
      .option("truncate","false")
      .outputMode("append")
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .queryName(queryName)
      .option("checkpointLocation", checkpointDir)
      .start()

    spark.streams.awaitAnyTermination()

  }
}


查看Kafka Offset 可通过以下命令查看Topic消费者组对应的Offset。

bin/kafka-consumer-offset-checker.sh --zookeeper kafka01:2181  --topic test_3 --group read_kafka_c2
Group           Topic                          Pid Offset          logSize         Lag             Owner
read_kafka_c2   test_3                         0   32              32              0               none
read_kafka_c2   test_3                         1   32              32              0               none
read_kafka_c2   test_3                         2   34              34              0               none
 


 
同理,可查看另外两个Topic对应的Group的Offset。

更多可参照项目:GitHub - HeartSaVioR/spark-sql-kafka-offset-committer: Kafka offset committer for structured streaming query

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389525.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号