栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkStreaming业务逻辑处理的一些高级算子

SparkStreaming业务逻辑处理的一些高级算子

1、reduceByKey

  reduceByKey 是按key进行计算,操作的数据是每个批次内的数据(一个采集周期),不能跨批次计算。如果需要实现对历史数据的跨批次统计累加,则需要使用updateStateByKey算子或者mapWithState算子。

package com.sparkscala.streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object StreamingWordCountScala {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)

    //一、初始化程序入口
    
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))

    //二、获取数据流,就是数据源
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.244.130", 1234)

    //三、数据处理
    //val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
    val wordResult: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    //四、数据输出查看
    wordResult.print()

    //五、启动任务
    ssc.start()  //启动
    ssc.awaitTermination()  //线程等待,等待处理下一批次任务
    ssc.stop()  //关闭
  }
}

2、updateStateByKey

 updateStateByKey 算子是统计历史所有的数据,实现累加

有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordCount)。 针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。重点:首先会以DStream中的数据进行按key做reduce操作,然后再对各个批次的数据进行累加

注意:

reduceByKey 是无状态操作,即操作的数据都是每个批次内的数据(一个采集周期)updateStateByKey 是状态操作,即操作从启动到当前的所有采集周期内的数据(跨批次操作)

以WordCount计算为例:

package com.sparkscala.streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object UpdateStateByKeyDemo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)

    //一、初始化程序入口
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))

    //为了实现对历史数据的累加,需要设置检查点目录
    ssc.checkpoint("D:\Java Project\DATA\UpdateStateByKeyDemo_checkpoint")

    //二、读取数据流,就是数据源
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9999)

    //三、数据处理
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))

    
    val wordResult: DStream[(String, Int)] = wordAndOne.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum     //将目前新进来的批次的所有value值相加
      val lastCount = state.getOrElse(0)   //取出之前累加统计的历史状态值
      Some(currentCount + lastCount)   //目前值的和加上历史值,完成状态的更新
    })

    //四、数据输出
    wordResult.print()

    //五、启动任务
    ssc.start()
    ssc.awaitTermination()  //线程等待,等待处理下一批次任务
    ssc.stop()
  }
}
3、mapWithState

  mapWithState:也是用于全局统计key的状态,但是它如果没有数据输入,在没有设置全局输出的情况下,默认不会返回之前的key的状态,类似于增量的感觉。

注意:mapWithState算子比updateStateByKey效率更高,因为:

updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。mapWithState只返回变化后的key的值,这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高(在生产环境中建议使用这个)。

package com.sparkscala.streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, MapWithStateDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext, Time}


object MapWithStateDemo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)

    //一、初始化程序入口
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    //val sc: SparkContext = new SparkContext(conf)
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))

    //设置检查点目录
    ssc.checkpoint("D:\Java Project\DATA\MapStateByKeyDemo_checkpoint")

    //二、读取数据流,也就是数据源
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9999)

    //三、数据处理
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))

    //可以设置初始值
    val initialRDD: RDD[(String, Long)] = ssc.sparkContext.parallelize(List(("flink", 100L), ("spark", 50L)))

    

    
    val stateSpec: StateSpec[String, Int, Long, (String, Long)] = StateSpec.function((currentBatchTime: Time, key: String, value: Option[Int], state: State[Long]) => {
      val sum = value.getOrElse(0).toLong + state.getOption().getOrElse(0L)
      val output = (key, sum)
      //更新状态值,如果你的数据没有超时的话
      if (!state.isTimingOut()) {
        state.update(sum)
      }
      Some(output) //返回值,要求返回的是key-value类型的
    }).initialState(initialRDD)   //设置初始值
      .numPartitions(2).timeout(Seconds(10))
    //timeout:超时。当一个key超过Seconds(10)这个时间没有接收到新数据的时候,这个key以及对应的状态会被移除掉,也就是重新统计。

    val result: MapWithStateDStream[String, Int, Long, (String, Long)] = wordAndOne.mapWithState(stateSpec)

    //四、数据输出
    //result.print()    //打印出来发生变化的数据
    result.stateSnapshots().print()   //打印出来的是全量的数据

    //五、启动任务
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}
4、transform算子实现黑名单过滤
package com.sparkscala.streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object TransformDemo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)

    //一、设置程序入口
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))

    //二、获取数据流,即数据源
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9993)

    //三、数据处理
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))

    //具体的黑名单操作   定义黑名单的规则:$ ? ! 过滤掉
    //定义黑名单   首先要获取到黑名单,企业中可以从Mysql,Redis里面去获取。
    val filterRDD: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(List("$", "?", "!")).map((_, true))
    //优化:给过滤的规则数据通过广播变量广播出去
    val filterBroadCast: Broadcast[Array[(String, Boolean)]] = ssc.sparkContext.broadcast(filterRDD.collect())

    //实现过滤
    val filterResult: DStream[(String, Int)] = wordAndOne.transform(rdd => {
      val filterRDD2: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(filterBroadCast.value)
      
      val result: RDD[(String, (Int, Option[Boolean]))] = rdd.leftOuterJoin(filterRDD2)
      val joinResult: RDD[(String, (Int, Option[Boolean]))] = result.filter(tuple => {
        tuple._2._2.isEmpty      //过滤出来我们想要的数据
      })
      //在Scala里面最后一行就是方法的返回值
      joinResult.map(tuple => (tuple._1, tuple._2._1))

      //将黑名单字符替换成 * 号
      
    })

    //实现累加
    val finalResult: DStream[(String, Int)] = filterResult.reduceByKey(_ + _)

    //四、数据输出
    finalResult.print()   //打印出来发生变化的数据

    //五、启动任务
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}
5、Window操作——reduceByKeyAndWindow算子

  reduceByKeyAndWindow 窗口函数允许你在一个滑动的窗口中进行计算。

所有这些窗口操作都需要两个参数 windowLength(窗口大小,即窗口的持续时间) 和 slideInterval(滑动间隔,即执行窗口操作的间隔);比如说我们现在要每隔2秒,统计前4秒内每一个单词出现的次数,这个时候就需要用这个窗口函数了;请注意:窗口大小和滑动间隔必须是间隔的整数倍。

package com.sparkscala.streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object WindowDemo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)

    //一、初始化程序入口
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))

    //二、获取数据流,即数据源
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9995)

    //三、数据处理
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))

    
    val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int)=>x+y, Seconds(6), Seconds(4))

    //四、数据输出
    result.print()

    //五、启动任务
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}
6、SparkStreaming和SparkSQL整合

  SparkStreaming和SparkSQL整合之后,就非常的方便,可以使用SQL的方式操作相应的数据,很方便。

package com.sparkscala.streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SparkSession}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object StreamAndSQLDemo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)

    //一、设置程序入口
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))

    //二、获取数据流,即数据源
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9996)

    //三、数据处理
    //这里必须先转换成DStream才能进行下面的转换 toDF 操作
    val words: DStream[String] = lines.flatMap(_.split(" "))
    //获取到一个一个的单词
    words.foreachRDD(rdd => {
      val spark: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
      import spark.implicits._
      //隐式转换
      val wordDataframe: Dataframe = rdd.toDF("word")
      //注册一个临时视图
      wordDataframe.createOrReplaceTempView("words")
      //数据输出
      spark.sql("select word, count(*) as totalCount from words group by word").show()
    })

    //五、启动任务
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742988.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号