栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

sparkstreaming自定义kafka

sparkstreaming自定义kafka

自动提交:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.codehaus.jackson.map.deser.std.StringDeserializer


object sparkStreaming_kafka_Demo {
  def main(args: Array[String]): Unit = {


    // TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc:StreamingContext = new StreamingContext(sc, Seconds(5)) //每隔5秒划分一个批次
    ssc.checkpoint("./ckp")

    // TODO 1.加载数据-从Kafka加载
    val kafkaParams = Map[String,Object](
      "bootstrap.servers"->"localhost:9092", //Kafka集群地址
      "key.deserializer" -> classOf[StringDeserializer], //key的反序列化规则
      "value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
      "group.id" -> "sparkdemo", // 消费者组名称
      // earliest:表示如果由offset记录从offset记录开始消费,如果没有从最早的消息开始消费
      // latest:表示如果有offset记录从offset记录开始消费,如果没有从最后/最新的消息开始消费
      // none: 表示如果有offset记录从offset记录开始消费,如果没有则报错
      "auto.offset.reset"->"latest", //如果由offset记录从offset开始消费,没有则从最新的开始消费
      "auto.commit.interval.ms" ->"10000", //自动提交的时间间隔
      "enable.auto.commit" -> (true:java.lang.Boolean) //是否自动提交
    )

    val topics =  Array("topicA")  //要订阅的主题

    // TODO 4.读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc, //传入StreamingContext
      LocationStrategies.PreferConsistent, //位置策略
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) //消费策略
    )      // 消费策略,使用源码中推荐的

    // TODO 5.处理消息
    val infoDS:DStream[String] = kafkaDStream.map(record => {
      val topic: String = record.topic()
      val partition: Int = record.partition()
      val offset: Long = record.offset()
      val key: String = record.key()
      val value: String = record.value()
      val info: String =
        s"""
           |topic:${topic},
           |partition:${partition},
           |offset:${offset},
           |key:${key}
           |value:${value}
           |"""
      info
    })

    // TODO 6.输出结果
    infoDS.print()

    // TODO 7.启动并等待结束
    ssc.start()
    ssc.awaitTermination() // 注意:流式应用程序启动之后需要已知运行等待手动停止/等待数据到来

    // TODO 8.关闭资源
    ssc.stop()

  }

}

手动提交:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.codehaus.jackson.map.deser.std.StringDeserializer


object sparkStreaming_kafka_Demo2 {
  def main(args: Array[String]): Unit = {


    // TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc:StreamingContext = new StreamingContext(sc, Seconds(5)) //每隔5秒划分一个批次
    ssc.checkpoint("./ckp")

    // TODO 1.加载数据-从Kafka加载
    val kafkaParams = Map[String,Object](
      "bootstrap.servers"->"localhost:9092", //Kafka集群地址
      "key.deserializer" -> classOf[StringDeserializer], //key的反序列化规则
      "value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
      "group.id" -> "sparkdemo", // 消费者组名称
      // earliest:表示如果由offset记录从offset记录开始消费,如果没有从最早的消息开始消费
      // latest:表示如果有offset记录从offset记录开始消费,如果没有从最后/最新的消息开始消费
      // none: 表示如果有offset记录从offset记录开始消费,如果没有则报错
      "auto.offset.reset"->"latest", //如果由offset记录从offset开始消费,没有则从最新的开始消费
//      "auto.commit.interval.ms" ->"10000", //自动提交的时间间隔
      "enable.auto.commit" -> (true:java.lang.Boolean) //是否自动提交
    )

    val topics =  Array("topicA")  //要订阅的主题

    // TODO 4.读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))  // 消费策略,使用源码中推荐的

    // TODO 5.处理消息
    // 注意提交的时机: 应该是消费完一小批久提交一次offset
    kafkaDStream.foreachRDD(rdd=>{
      if(!rdd.isEmpty()){
      rdd.foreach(record=>{
        val topic: String = record.topic()
        val partition: Int = record.partition()
        val offset: Long = record.offset()
        val key: String = record.key()
        val value: String = record.value()
        val info: String =
          s"""
             |topic:${topic},
             |partition:${partition},
             |offset:${offset},
             |key:${key}
             |value:${value}
             |"""
        println("消费到的消息的详细信息为:" +info)
      })
      // 获取rdd中offset相关的信息:offsetRanges里面包含了该批次各个分区的offset信息
      val offsetRanges:Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      // 提交
      kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      print("当前批次的数据已消费并手动提交")
    }
  })



    // TODO 7.启动并等待结束
    ssc.start()
    ssc.awaitTermination() // 注意:流式应用程序启动之后需要已知运行等待手动停止/等待数据到来

    // TODO 8.关闭资源
    ssc.stop()

  }

}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/335807.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号