栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access

spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access

问题描述

spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
报错信息如图:

代码
object testScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("fs01")
      .setMaster("local[*]")
    conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.streaming.kafka.maxRatePerPartition","10")

    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val interval = PropertiesUtils.loadProperties("streaming.interval").toLong
    val ssc:StreamingContext = new StreamingContext(sc, Seconds(5))
    val kalfa_server_list: String = PropertiesUtils.loadProperties("kafka.broker.list")
    val kafka_group: String = "group_test_role_1"
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" ->  kalfa_server_list, 
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" ->  kafka_group, //消费者组名kafka.groupId
      "auto.offset.reset" -> "earliest", //earliest可以获取历史数据
      "enable.auto.commit" -> "false") //如果是true,则这个消费者的偏移量会在后台自动提交
    val topics = Array("t_2021-09") 
    var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent, //多数时候采用该方式,在所有可用的executor上均匀分配kafka的主题的所有分区。
      Subscribe[String, String](topics, kafkaParams))
    val cacheOper = kafkaStream.transform(rdd=>{
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val check = rdd.map(x => {
           x.value()})//.cache()
      check
    }) 
     cacheOper.window(Seconds(20),Seconds(10)).foreachRDD(rdd=>{
      rdd.foreach(x=>{
        println("data:"+x)
        Thread.sleep(2000L)
      })
    kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}
解决办法

添加 conf.set(“spark.streaming.kafka.consumer.cache.enabled”, “false”)
这个问题发生的原因是spark 缓存问题,可以查看官网spark streaming整合kafka官网地址

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/618040.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号