- 1、启动集群
- 2、IDEA安装依赖
- 2.1 启动服务端监听 Socket 服务
- 2.2 实现 transform() 方法,分割多个单词
- 2.3 UpdateStateByKeyTest 更新值
- 2.4 Dstream 窗口操作
- 2.5 DStream 输出操作
启动zookeeper,hadoop,flume # 1、三个节点 /usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh start /usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh status # 2、master节点 启动hadoop /usr/hadoop/hadoop-2.7.3/sbin/start-all.sh
2、IDEA安装依赖
2.11.8 2.7.3 2.4.0 1.2.4 2.1.1 org.scala-lang scala-library ${scala.version} org.apache.spark spark-core_2.11 ${spark.version} org.apache.spark spark-streaming_2.11 2.4.0 org.apache.spark spark-sql_2.11 2.4.0 org.apache.spark spark-mllib_2.11 ${spark.version} org.apache.spark spark-streaming-kafka-0-8_2.11 ${spark.version} org.apache.spark spark-streaming-flume_2.11 2.3.0 org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hadoop hadoop-common 2.7.3 org.apache.hadoop hadoop-hdfs 2.7.3 org.apache.hbase hbase-client ${hbase.version} org.apache.hbase hbase-common ${hbase.version} org.apache.hbase hbase-server ${hbase.version} org.apache.hbase hbase-protocol ${hbase.version} org.apache.hbase hbase-annotations ${hbase.version} test-jar test org.apache.hive hive-exec ${hive.version} org.apache.hive hive-jdbc ${hive.version} org.apache.kafka kafka-clients 2.4.0 org.apache.kafka kafka-streams 2.4.0 mysql mysql-connector-java 5.1.46 com.alibaba fastjson 1.2.17 com.huaban jieba-analysis 1.0.2 junit junit 4.12 test org.scala-tools maven-scala-plugin 2.15.2 compile testCompile org.apache.maven.plugins maven-compiler-plugin 3.1 1.8 1.8 org.apache.maven.plugins maven-surefire-plugin 2.12.4 true compile
在之前安装过的只需要安装:
org.apache.kafka kafka-streams 2.4.0
2.1 启动服务端监听 Socket 服务
命令:nc -lk 9999
2.2 实现 transform() 方法,分割多个单词
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object TransformTest {
def main(args: Array[String]): Unit = {
// 1.创建SparkConf对象
val sparkConf: SparkConf = new SparkConf()
.setAppName("TransformTest").setMaster("local[2]")
// 2.创建SparkContext对象,它是所有任务计算的源头
val sc: SparkContext = new SparkContext(sparkConf)
// 3.设置日志级别
sc.setLogLevel("WARN")
// 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
// 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
// 以上是固定搭配结构
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
// 6.使用RDD-to-RDD函数,返回新的DStream对象(即words),并空格切分每行
val words: DStream[String] = dstream.transform(rdd => rdd.flatMap(_.split(" ")))
// 7.打印输出结果
words.print()
// 8.开启流式计算
ssc.start()
// 9.让程序一直运行,除非人为干预停止
ssc.awaitTermination()
}
}
2.3 UpdateStateByKeyTest 更新值
如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制。以便于在内存数据丢失的 时候,可以从checkpoint恢复数据。
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object UpdateStateByKeyTest {
//newValues 表示当前批次汇总成的(word,1)中相同单词的所有1
//runningCount 表示历史的所有相同key的value总和
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount =runningCount.getOrElse(0)+newValues.sum
Some(newCount)
}
def main(args: Array[String]): Unit = {
// 1.创建SparkConf对象 设置appName和master地址 local[2] 表示本地采用2个线程运行任务
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
// 2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
val sc: SparkContext = new SparkContext(sparkConf)
// 3.设置日志级别
sc.setLogLevel("WARN")
// 4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
val ssc: StreamingContext = new StreamingContext(sc,Seconds(2))
// 5.配置检查点目录,使用updateStateByKey方法必须配置检查点目录
ssc.checkpoint("D:\data\ch1")
// 6.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("master",9999)
// 7.按空格进行切分每一行,并将切分的单词出现次数记录为1
val wordAndOne: DStream[(String, Int)] = dstream.flatMap(_.split(" ")).map(word =>(word,1))
// 8.调用updateStateByKey操作,统计单词在全局中出现的次数
var result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunction)
// 9.打印输出结果
result.print()
// 10.开启流式计算
ssc.start()
// 11.让程序一直运行,除非人为干预停止
ssc.awaitTermination()
}
}
当注释掉 ssc.checkpoint("D:\data\ch1"),则运行会报错。
2.4 Dstream 窗口操作
- 事先设定一个滑动窗口的长度(也就是窗口的持续时间);
- 设定滑动窗口的时间间隔(每隔多长时间执行一次计算),让窗口按照指定时间间隔在源DStream上滑动;
- 每次窗口停放的位置上,都会有一部分Dstream(或者一部分RDD)被框入窗口内,形成一个小段的Dstream;
- 可以启动对这个小段DStream的计算。
| 方法名称 | 相关说明 |
|---|---|
| window(windowLength, slideInterval) | 基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream; |
| countByWindow(windowLength, slideInterval) | 返回流中元素的一个滑动窗口数; |
| reduceByWindow(func, windowLength, slideInterval) | 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算; |
| reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数。 |
| reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | 更加高效的 reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce‖操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)。 |
| countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream;每个key的值都是它们在滑动窗口中出现的频率。 |
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
object WindowTest {
def main(args: Array[String]): Unit = {
// 1.创建SparkConf对象
val sparkConf: SparkConf = new SparkConf()
.setAppName("WindowTest ").setMaster("local[2]")
// 2.创建SparkContext对象,它是所有任务计算的源头
val sc: SparkContext = new SparkContext(sparkConf)
// 3.设置日志级别
sc.setLogLevel("WARN")
// 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
val ssc: StreamingContext = new StreamingContext(sc,Seconds(1))
// 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
val dstream: ReceiverInputDStream[String] = ssc
.socketTextStream("master",9999)
// 6.按空格进行切分每一行
val words: DStream[String] = dstream.flatMap(_.split(" "))
// 7.调用window操作,需要两个参数,窗口长度和滑动时间间隔
val windowWords: DStream[String] = words.window(Seconds(3),Seconds(1))
// 8.打印输出结果
windowWords.print()
// 9.开启流式计算
ssc.start()
// 10.让程序一直运行,除非人为干预停止
ssc.awaitTermination()
}
}
把批处理时间间隔、窗口长度和滑动时间间隔进行变换,在其中两者非整数倍的情况下,会报错。
2.5 DStream 输出操作
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
object SaveAsTextFilesTest {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
//1.创建SparkConf对象 设置appName和master地址 local[2] 表示本地采用2个线程运行任务
val sparkConf: SparkConf = new SparkConf().setAppName("SaveAsTextFilesTest").setMaster("local[2]")
//2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
val sc: SparkContext = new SparkContext(sparkConf)
//3.设置日志级别
sc.setLogLevel("WARN")
//4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
//5.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
//6.调用saveAsTextFiles操作,将nc窗口输出的内容保存到HDFS上
dstream.saveAsTextFiles("hdfs://master:8020//saveAsTextFiles/satf","txt")
//7.开启流式计算
ssc.start()
//8.让程序一直运行,除非人为干预停止
ssc.awaitTermination()
}
}



