reduceByKey 是按key进行计算,操作的数据是每个批次内的数据(一个采集周期),不能跨批次计算。如果需要实现对历史数据的跨批次统计累加,则需要使用updateStateByKey算子或者mapWithState算子。
package com.sparkscala.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCountScala {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//一、初始化程序入口
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
//二、获取数据流,就是数据源
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.244.130", 1234)
//三、数据处理
//val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val wordResult: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//四、数据输出查看
wordResult.print()
//五、启动任务
ssc.start() //启动
ssc.awaitTermination() //线程等待,等待处理下一批次任务
ssc.stop() //关闭
}
}
2、updateStateByKey
updateStateByKey 算子是统计历史所有的数据,实现累加
有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordCount)。 针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。重点:首先会以DStream中的数据进行按key做reduce操作,然后再对各个批次的数据进行累加
注意:
reduceByKey 是无状态操作,即操作的数据都是每个批次内的数据(一个采集周期)updateStateByKey 是状态操作,即操作从启动到当前的所有采集周期内的数据(跨批次操作)
以WordCount计算为例:
package com.sparkscala.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object UpdateStateByKeyDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//一、初始化程序入口
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
//为了实现对历史数据的累加,需要设置检查点目录
ssc.checkpoint("D:\Java Project\DATA\UpdateStateByKeyDemo_checkpoint")
//二、读取数据流,就是数据源
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9999)
//三、数据处理
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val wordResult: DStream[(String, Int)] = wordAndOne.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum //将目前新进来的批次的所有value值相加
val lastCount = state.getOrElse(0) //取出之前累加统计的历史状态值
Some(currentCount + lastCount) //目前值的和加上历史值,完成状态的更新
})
//四、数据输出
wordResult.print()
//五、启动任务
ssc.start()
ssc.awaitTermination() //线程等待,等待处理下一批次任务
ssc.stop()
}
}
3、mapWithState
mapWithState:也是用于全局统计key的状态,但是它如果没有数据输入,在没有设置全局输出的情况下,默认不会返回之前的key的状态,类似于增量的感觉。
注意:mapWithState算子比updateStateByKey效率更高,因为:
updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。mapWithState只返回变化后的key的值,这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高(在生产环境中建议使用这个)。
package com.sparkscala.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, MapWithStateDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext, Time}
object MapWithStateDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//一、初始化程序入口
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
//val sc: SparkContext = new SparkContext(conf)
val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
//设置检查点目录
ssc.checkpoint("D:\Java Project\DATA\MapStateByKeyDemo_checkpoint")
//二、读取数据流,也就是数据源
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9999)
//三、数据处理
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
//可以设置初始值
val initialRDD: RDD[(String, Long)] = ssc.sparkContext.parallelize(List(("flink", 100L), ("spark", 50L)))
val stateSpec: StateSpec[String, Int, Long, (String, Long)] = StateSpec.function((currentBatchTime: Time, key: String, value: Option[Int], state: State[Long]) => {
val sum = value.getOrElse(0).toLong + state.getOption().getOrElse(0L)
val output = (key, sum)
//更新状态值,如果你的数据没有超时的话
if (!state.isTimingOut()) {
state.update(sum)
}
Some(output) //返回值,要求返回的是key-value类型的
}).initialState(initialRDD) //设置初始值
.numPartitions(2).timeout(Seconds(10))
//timeout:超时。当一个key超过Seconds(10)这个时间没有接收到新数据的时候,这个key以及对应的状态会被移除掉,也就是重新统计。
val result: MapWithStateDStream[String, Int, Long, (String, Long)] = wordAndOne.mapWithState(stateSpec)
//四、数据输出
//result.print() //打印出来发生变化的数据
result.stateSnapshots().print() //打印出来的是全量的数据
//五、启动任务
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
4、transform算子实现黑名单过滤
package com.sparkscala.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object TransformDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//一、设置程序入口
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
//二、获取数据流,即数据源
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9993)
//三、数据处理
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
//具体的黑名单操作 定义黑名单的规则:$ ? ! 过滤掉
//定义黑名单 首先要获取到黑名单,企业中可以从Mysql,Redis里面去获取。
val filterRDD: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(List("$", "?", "!")).map((_, true))
//优化:给过滤的规则数据通过广播变量广播出去
val filterBroadCast: Broadcast[Array[(String, Boolean)]] = ssc.sparkContext.broadcast(filterRDD.collect())
//实现过滤
val filterResult: DStream[(String, Int)] = wordAndOne.transform(rdd => {
val filterRDD2: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(filterBroadCast.value)
val result: RDD[(String, (Int, Option[Boolean]))] = rdd.leftOuterJoin(filterRDD2)
val joinResult: RDD[(String, (Int, Option[Boolean]))] = result.filter(tuple => {
tuple._2._2.isEmpty //过滤出来我们想要的数据
})
//在Scala里面最后一行就是方法的返回值
joinResult.map(tuple => (tuple._1, tuple._2._1))
//将黑名单字符替换成 * 号
})
//实现累加
val finalResult: DStream[(String, Int)] = filterResult.reduceByKey(_ + _)
//四、数据输出
finalResult.print() //打印出来发生变化的数据
//五、启动任务
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
5、Window操作——reduceByKeyAndWindow算子
reduceByKeyAndWindow 窗口函数允许你在一个滑动的窗口中进行计算。
所有这些窗口操作都需要两个参数 windowLength(窗口大小,即窗口的持续时间) 和 slideInterval(滑动间隔,即执行窗口操作的间隔);比如说我们现在要每隔2秒,统计前4秒内每一个单词出现的次数,这个时候就需要用这个窗口函数了;请注意:窗口大小和滑动间隔必须是间隔的整数倍。
package com.sparkscala.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WindowDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
//一、初始化程序入口
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val ssc: StreamingContext = new StreamingContext(conf, Seconds(2))
//二、获取数据流,即数据源
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9995)
//三、数据处理
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int)=>x+y, Seconds(6), Seconds(4))
//四、数据输出
result.print()
//五、启动任务
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
6、SparkStreaming和SparkSQL整合
SparkStreaming和SparkSQL整合之后,就非常的方便,可以使用SQL的方式操作相应的数据,很方便。
package com.sparkscala.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SparkSession}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamAndSQLDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//一、设置程序入口
val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
//二、获取数据流,即数据源
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9996)
//三、数据处理
//这里必须先转换成DStream才能进行下面的转换 toDF 操作
val words: DStream[String] = lines.flatMap(_.split(" "))
//获取到一个一个的单词
words.foreachRDD(rdd => {
val spark: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
//隐式转换
val wordDataframe: Dataframe = rdd.toDF("word")
//注册一个临时视图
wordDataframe.createOrReplaceTempView("words")
//数据输出
spark.sql("select word, count(*) as totalCount from words group by word").show()
})
//五、启动任务
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}



