栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkStreaming-----第一个wordcount,算子,Driver HA

SparkStreaming-----第一个wordcount,算子,Driver HA

1.sparkStreaming
流式处理框架,是Spark API的扩展,RDD最终封装到DStream中

2.第一个wordcount

pom依赖


    org.apache.spark
    spark-streaming_2.12
    3.0.0
    provided

import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}



object SparkStreaming01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("streaming01")
    conf.setMaster("local[2]")  //一个线程接收数据,一个线程处理数据
    val sc: SparkContext = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Durations.seconds(10)) //设置每10秒执行一次处理数据
    val lines = ssc.socketTextStream("hadoop102", 999)  //设置了虚拟机和端口号
    //统计所有单词出现次数
    val words=lines.flatMap(line=>line.split(" "))
    val pairWords=words.map(word=>new Tuple2(word,1))
    val result= pairWords.reduceByKey((v1,v2)=>{v1 + v2})
    //output operation类算子
    result.print()
    ssc.start()  //启动sparkStreaming
    ssc.awaitTermination()
  }
}

数据来源

3.foreachRDD算子

1.foreachRDD可以获取DStream中的RDD,可以对RDD使用RDD的算子操作,但是一定要使用RDD的action算子触发执行

 result.foreachRDD((rdd: RDD[(String, Int)]) => {
      val rdd1: RDD[String] = rdd.map(tp => {
        println("======="+tp)
        tp._1 + "=" + tp._2
      })
      rdd1.count()
    })

4.transform
transformation类算子,对Dstream做RDD到RDD的任意操作

5.updateStateByKey
transformation类算子,对每一个key的状态进行更新

6.Driver HA

第一:提交任务层面,在提交任务的时候加上选项 --supervise,当Driver挂掉的时候会自动重启Driver。
第二:代码层面,使用JavaStreamingContext.getOrCreate(checkpoint路径,JavaStreamingContextFactory)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/629527.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号