Spark Streaming抽象、架构与原理
Spark Streaming 背压(Back Pressure)机制
环境配置同020 Spark SQL,并启动HIVE
package com.atguigu.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WorldCount {
def main(args: Array[String]) {
System.setProperty("HADOOP_USER_NAME", "atguigu")
// 定义更新状态方法,参数values为当前批次单词频度,state为以往批次单词频度
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(3))
ssc.checkpoint("./ck")
// Create a DStream that will connect to hostname:port, like hadoop102:9999
val lines = ssc.socketTextStream("hadoop102", 9998)
// Split each line into words
val words = lines.flatMap(_.split(" "))
//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
// 使用updateStateByKey来更新状态,统计从运行开始以来单词总的次数
val stateDstream = pairs.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
//ssc.stop()
}
}
3、滑动窗口操作nc -lk 9999
Spark Streaming之window(窗口操作)
优雅关闭spark streaming job填坑之路
sparkStreaming 连接数据库 --设计模式
回调方法?钩子方法?模板模式?
scala学习笔记-case class



