栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkStreaming累加计算单词频率

SparkStreaming累加计算单词频率

SparkStreaming累加计算单词频率
    • 一、需求分析
    • 二、实验环境
    • 三、思路分析
    • 四、编程实现

一、需求分析

在服务器端不断产生数据的时候,sparkstreaming客户端需要不断统计服务器端产生的相同数据出现的总数,即累计服务器端产生的相同数据的出现的次数。

二、实验环境

centos7 + nc + spark2.4.8 + windows + idea

三、思路分析
  • 流程分析
  • 思路分析:

    每次客户端程序处理服务器端数据后,将其结果缓存在检查点中,下一次客户端读入数据并处理数据时会去检查点根据key查询和进行更新,并重新将结果更新到检查点中。
    检查点:本质上就是对应于HDFS上的一个目录,将数据写入到该目录下以文件的形式将结果保存下来。故,需要先在hdfs上创建检查点对应的目录。

四、编程实现
  • 实验步骤:
    • 编写客户端处理程序,程序如下:

      import org.apache.spark.SparkConf
      import org.apache.spark.storage.StorageLevel
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      
      object MyTotalNetworkWordCount {
        def main(args: Array[String]): Unit = {
          //创建一个Context对象: StreamingContext (SparkContext, SQLContext)
          //指定批处理的时间间隔
          val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
          val ssc = new StreamingContext(conf,Seconds(5))
          //设置检查点
          ssc.checkpoint("hdfs://hadoop001:9000/spark/checkpoint")
      
          //创建一个DStream,处理数据,hadoop001为虚拟机的主机名,端口号为netcat服务的端口号
          val lines = ssc.socketTextStream("hadoop001",6666,StorageLevel.MEMORY_AND_DISK_SER)
      
          //执行wordcount
          val words = lines.flatMap(_.split(" "))
      
          //定义函数用于累计每个单词的总频率
          val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
            //通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和
            val currentCount = currValues.sum
            // 已累加的值
            val previousCount = prevValueState.getOrElse(0)
            // 返回累加后的结果,是一个Option[Int]类型
            Some(currentCount + previousCount)
          }
      
          val pairs = words.map(word => (word, 1))
      
          val totalWordCounts = pairs.updateStateByKey[Int](addFunc)
          totalWordCounts.print()
      
          ssc.start()
          ssc.awaitTermination()
        }
      }
      
    • 运行程序

    • 在Linux中启动nc: nc -l -p 6666

    • 输入测试数据,如I love Guizhou等后,每输入一次数据执行一次回车:

    • 观察客户端程序控制台是否有结果出现,如图所示:

    • 查看下检查点是否有数据:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487187.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号