Spark 2.0产生了一个新的流处理框架Structured Streaming(结构化流),它是一个可伸缩的、容错的流处理引擎,构建在Spark SQL引擎之上。使用StructuredStreaming可以在静态数据(Dataset/Dataframe)上像批处理计算一样进行流式计算。随着数据的不断到达,Spark SQL引擎会增量地、连续地对其进行处理,并更新最终结果。
简单来讲就是DSteam是基于RDD的DSteam,Structured Streaming是基于Dataset(Dataframe)的。
默认情况下,Structured Streaming使用微批处理引擎将数据流作为一系列小批次作业进行处理,从而实现端到端的延迟低至100毫秒。而自Spark 2.3以来,引入了一种新的低延迟处理模式,称为连续处理,它将端到端的延迟进一步降低至1毫秒。对于开发者来说,不需要考虑是流式计算还是批处理,只要以同样的方式编写计算操作即可,Structured Streaming在底层会自动实现快速、可伸缩、容错等处理。
一、简单使用添加依赖:
org.apache.spark spark-sql-kafka-0-10_2.12 3.1.2
测试代码:
object StructuredStreaming {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// sparkSession
val spark = SparkSession.builder
.appName("StructuredStreamingExample")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val values = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topictest")
.load()
.selectExpr("CAST(value AS STRING)") // 通过SQL表达式转化数据为字符串格式
.as[String] // 转化为DataSet
val wordCounts = values.flatMap(_.split(" ")).groupBy("value").count() //通过聚合统计个数
val query = wordCounts.writeStream
.outputMode("complete") // 输出模式
.format("console") // 结果打印
.option("checkpointLocation", "hdfs://localhost:9000/kafka-ck") // 设置检查点
.start()
query.awaitTermination() // 等待查询终止
}
}
- 创建本地SparkSession
- 创建一个流式Dataframe链接kafka,这里的表只有一列,命名为value
- 通过as[String]转化为DataSet
- 然后通过聚合做出现单词的统计
- 完全模式(Complete Mode):更新后的整个结果表将被写入外部存储。如何处理整个表的写入由存储连接器决定。
- 追加模式(Append Mode):默认模式。自上次触发后,只将结果表中追加的新行写入外部存储。这只适用于已经存在于结果表中的现有行不期望被改变的查询,如select、where、map、fatMap、filter、join等操作支持该模式。
- 更新模式(Update Mode):只有自上次触发后在结果表中更新(包括增加)的行才会写入外部存储(自Spark 2.1.1起可用)。这与完全模式不同,该模式只输出自上次触发以来更改的行。如果查询不包含聚合,就等同于追加模式。
1.文件
输出文件到指定目录,只支持追加模式。
val query = wordCounts.writeStream
.format("parquet")
.option("path", "hdfs://localhost:9000/structuredWordCount")
.start()
2.kafka
将计算结果输出到topic中。
val query = wordCounts.writeStream
.outputMode("complete")
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "topictest")
.option("checkpointLocation", "hdfs://localhost:9000/kafka-ck1")
.start()
3.控制台
通过控制台进行打印。
val query1 = wordCounts.writeStream
.outputMode("complete") // 输出模式, complete:输出所有内容, append:新增的行输出,update:更新的行输出
.format("console") // 结果打印
.option("checkpointLocation", "hdfs://localhost:9000/kafka-ck") // 设置检查点
.start()
4.内存
将计算结果作为内存中的表存储在内存中,用于小量数据的调试。
val query = wordCounts.writeStream
.outputMode("complete")
.format("memory")
.queryName("wordCount")
.option("checkpointLocation", "hdfs://localhost:9000/kafka-ck1")
.start()


![spark进阶(八): spark streaming用法[下] Structured Streaming使用 spark进阶(八): spark streaming用法[下] Structured Streaming使用](http://www.mshxw.com/aiimages/31/307718.png)
