栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark Streaming整合Kafka实现词频统计

Spark Streaming整合Kafka实现词频统计

pom.xml



    4.0.0

    com.SparkStream
    SparkStreamspace
    1.0-SNAPSHOT

    
        src/main/scala
        src/test/scala
        
            
                net.alchim31.maven
                scala-maven-plugin
                3.2.2
                
                    
                        
                            compile
                            testCompile
                        
                        
                            
                                -dependencyfile
                                ${project.build.directory}/.scala_dependencies
                            
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-shade-plugin
                2.4.3
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    *:*
                                    
                                        meta-INF
object SparkStreaming_Kafka_createDstream {
  def main(args: Array[String]): Unit = {
    //1. 初始化参数,conf, sc, ssc
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("SparkStreaming_Kafka_createDstream")
      .setMaster("local[4]")
      .set("spark.streaming.receiver.writeAheadLog.enable", "true")
    val sc: SparkContext = new SparkContext(sparkConf)
    //设置日志级别
    sc.setLogLevel("WARN")
    //创建StreamingContext
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
    //设置检查点, 开启WLA日志保存机制就要设置检查点
    ssc.checkpoint("./Kafka_Receiver")

    //2. 从kafka中拉取数据, KafKaUtil
    val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
    val groupId = "spark_receiver"
    //这里的1, 代表每一个分区被N个消费者消费
    val topics = Map("kafka_spark" -> 1)

    val receiverDstream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3)
      .map(x => {
        val stream: ReceiverInputDStream[(String, String)] = KafkaUtils
          .createStream(ssc, zkQuorum, groupId, topics)
        stream
      })

    //3. 从主体中获取具体的数据, 也就是value值, key是offect
    val unionDstream: DStream[(String, String)] = ssc.union(receiverDstream)

    //4. 单词计数
    val topicData: DStream[String] = unionDstream.map(_._2)

    val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_, 1))

    val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    //5. 打印
    result.print()

    //6. 开启流模式
    ssc.start()
    ssc.awaitTermination()

  }
}

开启zookeeper和kafka集群。

创建主题

kafka-topics.sh --create 
--topic kafka_spark 
--partitions 3 
--replication-factor 1 
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181

启动生产者

kafka-console-producer.sh 
--broker-list hadoop01:9092 
--topic kafka_spark


SparkStreaming_Kafka_createDirectStream.scala

import kafka.serializer.StringDecoder
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils

//todo:利用sparkStreaming对接kafka实现单词计数----采用Direct(低级API)
object SparkStreaming_Kafka_createDirectStream {
  def main(args: Array[String]): Unit = {
    //1、创建sparkConf
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("SparkStreaming_Kafka_createDirectStream")
      .setMaster("local[2]")
    //2、创建sparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    //3、创建StreamingContext
    val ssc = new StreamingContext(sc,Seconds(5))
    ssc.checkpoint("./Kafka_Direct")
    //4、配置kafka相关参数
    val kafkaParams=Map("metadata.broker.list"->"hadoop01:9092,hadoop02:9092,hadoop03:9092","group.id"->"spark_direct")
    //5、定义topic
    val topics=Set("kafka_direct0")
    //6、通过 KafkaUtils.createDirectStream接受kafka数据,这里采用是kafka低级api偏移量不受zk管理
    val dstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
    //7、获取kafka中topic中的数据
    val topicData: DStream[String] = dstream.map(_._2)
    //8、切分每一行,每个单词计为1
    val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))
    //9、相同单词出现的次数累加
    val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
    //10、打印输出
    result.print()
    //开启计算
    ssc.start()
    ssc.awaitTermination()
  }
}

创建主题

kafka-topics.sh --create 
--topic kafka_direct0 
--partitions 3 
--replication-factor 1 
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181

启动生产者

kafka-console-producer.sh 
--broker-list hadoop01:9092 
--topic kafka_direct0 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735750.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号