栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark小案例——sparkstreaming消费Kafka

spark小案例——sparkstreaming消费Kafka

使用sparkstreaming消费Kafka的数据,实现word count

依赖


    org.apache.spark
    spark-streaming-kafka-0-10_2.12
    3.0.0


     com.fasterxml.jackson.core
     jackson-core
     2.10.1

实现wordcount代码

val conf = new SparkConf().setAppName("StreamWordCount").setMaster("local[*]")
    val sc = new StreamingContext(conf, Seconds(3))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop01:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group1",
    
      "auto.offset.reset" -> "earliest"
    )

    
    val topics = Array("topic01")
    val stream = KafkaUtils.createDirectStream[String, String](
      sc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )


    
    val value: DStream[String] = stream.map(record => record.value())
    value.flatMap(_.split(" "))
        .map((_,1))
        .reduceByKey(_+_)
        .print()

    sc.start()
    sc.awaitTermination()

启动Kafka,创建Kafka producer

kafka-console-producer.sh --broker-list hadoop01:9092 --to topic01
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/317681.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号