Spark Streaming是Spark Core API(Spark RDD)的扩展,支持对实时数据流进行可伸缩、高吞吐量及容错处理。数据可以从Kafka、Flume、Kinesis或TCPSocket等多种来源获取,并且可以使用复杂的算法处理数据,这些算法由map()、reduce()、join()和window()等高级函数表示。
Spark Streaming提供了一种高级抽象,称为DStream(Discretized Stream)。 在内部,对输入数据流拆分成的每个批次实际上是一个RDD,一个DStream由多个RDD组成,相当于一个RDD序列。
与RDD类似,许多普通RDD上可用的操作算子DStream也支持。使用这些算子可以修改输入DStream中的数据,进而创建一个新的DStream。对DStream的操作主要有3种:无状态操作、状态操作、窗口操作。
状态操作是指,需要把当前时间批次和历史时间批次的数据进行累加计算,即当前时间批次的处理需要使用之前批次的数据或中间结果。使用updateStateByKey()算子可以保留key的状态,并持续不断地用新状态更新之前的状态。
在DStream上使用persist()方法可以将该DStream的每个RDD持久化到内存中。这在DStream中的数据需要被计算多次(例如,对同一数据进行多次操作)时非常有用。对于基于窗口的操作(如reduceByWindow()、reduceByKeyAndWindow()),以及基于状态的操作(如updateStateByKey(),都默认开启了persist()。
Spark Streaming应用程序必须全天候运行,因此与应用程序逻辑无关的故障(例如系统故障、JVM崩溃等)不应该对其产生影响。为此,Spark Streaming需要对足够的数据设置检查点,存储到容错系统中,使其能够从故障中恢复。
一、接收websock数据并累加处理- 创建流context
- 设置checkpoint
- 连接端口
- 进行累加处理
import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream
object SparkStream {
val updateFunc: (Seq[Int], Option[Int]) => Some[Int] = (values:Seq[Int], state:Option[Int]) => {
val currentCount = values.sum // 累加当前批次单词
val previousCount = state.getOrElse(0) //获取上一批次单词,默认为0
Some(currentCount+previousCount) // 加和
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("WordCount")
val ssc = new StreamingContext(conf, Seconds(10)) // 设置10秒统计一次
ssc.checkpoint("hdfs://localhost:9000/spark-ck") // 设置检查点
val lines = ssc.socketTextStream("localhost", 9999) // DStream通过连接端口获取数据
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val result:DStream[(String, Int)] = pairs.updateStateByKey(updateFunc) // 进行累加操作
result.print() // 打印前10行
ssc.start() // 启动
ssc.awaitTermination() // 等待完成
}
}
打开终端,开启9999端口:
(base) [~/softwares/kafka_2.12-2.8.1]$ nc -lk 9999 1 2 3 4 5 6 hello world二、kafka源的统计字数处理
首先添加依赖:
org.apache.spark spark-streaming-kafka-0-10_2.12 3.1.2
- 创建StringContext
- 对kafka进行配置
- 通过 KafkaUtils.createDirectStream进行直连
- 获取DStream进行累加处理
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
object StreamKafka {
val updateFunc: (Seq[Int], Option[Int]) => Some[Int] = (values:Seq[Int], state:Option[Int]) => {
val currentCount = values.sum // 累加当前批次单词
val previousCount = state.getOrElse(0) //获取上一批次单词,默认为0
Some(currentCount+previousCount) // 加和
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("StreamKafka")
val ssc = new StreamingContext(conf, Seconds(10)) // 设置10秒统计一次
ssc.checkpoint("hdfs://localhost:9000/spark-ck") // 设置检查点
val kafkaTopics = Array("topictest") // 设置使用的kafka topic 可以是多个
// kafka 的 配置
val kafkaParams = Map[String, Object] (
"bootstrap.servers" -> "ffzs-ub:9092",
"key.deserializer"->classOf[StringDeserializer], // 设置 key ,value 反序列化使用的类
"value.deserializer"->classOf[StringDeserializer],
"group.id"->"1", // 设置消费者组ID,ID相同的消费者属于同一组
"enable.auto.commit"->(false:java.lang.Boolean) // kafka不自动提交偏移量,通过spark操作, 默认是true
)
// 创建DStream
val inputStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
Subscribe[String, String](kafkaTopics,kafkaParams)
)
// 解析取出key和value
val linesDStream = inputStream.map(record => (record.key(), record.value()))
val word = linesDStream.map(_._2)
.flatMap(_.split(" "))
.map(it => (it, 1))
val result = word.updateStateByKey(updateFunc)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
终端打开kafka生产者:
(base) [~/softwares/kafka_2.12-2.8.1]$ kafka-console-producer.sh --broker-list ffzs-ub:9092 --topic topictest >a b b b b bbc c >hello world >ffzs is a good man


![spark进阶(七):spark streaming 用法[上] spark进阶(七):spark streaming 用法[上]](http://www.mshxw.com/aiimages/31/311340.png)
