object WordCount04 {
def main(args: Array[String]): Unit = {
// TODO 0.准备环境
val conf:SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
val sc:SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc:StreamingContext = new StreamingContext(sc, Seconds(5)) //每隔5秒划分一个批次
// TODO 1.加载数据
val lines:ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
// TODO 2.处理数据
val resultDS:DStream[(String,Int)] = lines.flatMap(
_.split(" "))
.map((_,1))
.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5))
// TODO 3.输出结果
resultDS.print()
// TODO 4.启动并等待结束
ssc.start()
ssc.awaitTermination() //注意:流式应用程序启动之后需要已知运行等待停止/等待数据到来
// TODO 5.关闭资源
ssc.stop() //优雅关闭
}
}