栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark综合学习笔记(七)SparkStreaming案例1 WordCount

Spark综合学习笔记(七)SparkStreaming案例1 WordCount

学习致谢:

https://www.bilibili.com/video/BV1Xz4y1m7cv?p=41

需求:

从TCP Socket数据源实时消费数据,对每批次Batch数据进行词频统计WordCount,流程图如下:

准备工作
1.在node01上安装nc命令
nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据

yum install -y nc
代码的实现:
package streaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object WordCount {
  def main(args: Array[String]): Unit = {
    //TODO 0.准备环境
    val conf:SparkConf=new SparkConf().setMaster("spark").setMaster("local[*]")
    val sc:   SparkContext=new SparkContext(conf)
    sc.setLogLevel("WARN")
    //the time interval at which streaming data will be dicided into batches
    val ssc:StreamingContext= new StreamingContext(sc,Seconds(5))
    //TODO 1.加载数据
    val lines:ReceiverInputDStream[String]=ssc.socketTextStream("node1",9999)
    //TODO 2.处理数据
    val resuleDS:DStream[(String,Int)]=lines.flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)

    //TODO 3.输出结果
    resuleDS.print()
    //TODO 4.启动并等待结束
    ssc.start()
    ssc.awaitTermination()//注意:流式应用程序启动之后需要一直运行等待停止、等待到来
    //TODO 5.关闭资源
    ssc.stop(stopSparkContext = true,stopGracefully = true)//优雅关闭
  }
}

虚拟机端输入

nc -lk 9999

然后输入每个流的数据信息,依据回车判定一批的数据
每隔5秒算一批,如果没有发,则为空

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583806.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号