栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark进阶(八): spark streaming用法[下] Structured Streaming使用

spark进阶(八): spark streaming用法[下] Structured Streaming使用

Spark 2.0产生了一个新的流处理框架Structured Streaming(结构化流),它是一个可伸缩的、容错的流处理引擎,构建在Spark SQL引擎之上。使用StructuredStreaming可以在静态数据(Dataset/Dataframe)上像批处理计算一样进行流式计算。随着数据的不断到达,Spark SQL引擎会增量地、连续地对其进行处理,并更新最终结果。

简单来讲就是DSteam是基于RDD的DSteam,Structured Streaming是基于Dataset(Dataframe)的。

默认情况下,Structured Streaming使用微批处理引擎将数据流作为一系列小批次作业进行处理,从而实现端到端的延迟低至100毫秒。而自Spark 2.3以来,引入了一种新的低延迟处理模式,称为连续处理,它将端到端的延迟进一步降低至1毫秒。对于开发者来说,不需要考虑是流式计算还是批处理,只要以同样的方式编写计算操作即可,Structured Streaming在底层会自动实现快速、可伸缩、容错等处理。

一、简单使用

添加依赖:


  org.apache.spark
  spark-sql-kafka-0-10_2.12
  3.1.2

测试代码:

object StructuredStreaming {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)

    // sparkSession
    val spark = SparkSession.builder
      .appName("StructuredStreamingExample")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val values = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "topictest")
      .load()
      .selectExpr("CAST(value AS STRING)")   // 通过SQL表达式转化数据为字符串格式
      .as[String]   // 转化为DataSet

    val wordCounts = values.flatMap(_.split(" ")).groupBy("value").count()  //通过聚合统计个数

    val query = wordCounts.writeStream
      .outputMode("complete")  // 输出模式
      .format("console")   // 结果打印
      .option("checkpointLocation", "hdfs://localhost:9000/kafka-ck")  // 设置检查点
      .start()

    query.awaitTermination()   // 等待查询终止
  }
}
  • 创建本地SparkSession
  • 创建一个流式Dataframe链接kafka,这里的表只有一列,命名为value
  • 通过as[String]转化为DataSet
  • 然后通过聚合做出现单词的统计
输出模式
  • 完全模式(Complete Mode):更新后的整个结果表将被写入外部存储。如何处理整个表的写入由存储连接器决定。
  • 追加模式(Append Mode):默认模式。自上次触发后,只将结果表中追加的新行写入外部存储。这只适用于已经存在于结果表中的现有行不期望被改变的查询,如select、where、map、fatMap、filter、join等操作支持该模式。
  • 更新模式(Update Mode):只有自上次触发后在结果表中更新(包括增加)的行才会写入外部存储(自Spark 2.1.1起可用)。这与完全模式不同,该模式只输出自上次触发以来更改的行。如果查询不包含聚合,就等同于追加模式。
外部存储

1.文件

输出文件到指定目录,只支持追加模式。

val query = wordCounts.writeStream
  .format("parquet")
  .option("path", "hdfs://localhost:9000/structuredWordCount")
  .start()

2.kafka

将计算结果输出到topic中。

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "topictest")
  .option("checkpointLocation", "hdfs://localhost:9000/kafka-ck1")
  .start()

3.控制台

通过控制台进行打印。

val query1 = wordCounts.writeStream
  .outputMode("complete")  // 输出模式, complete:输出所有内容, append:新增的行输出,update:更新的行输出
  .format("console")   // 结果打印
  .option("checkpointLocation", "hdfs://localhost:9000/kafka-ck")  // 设置检查点
  .start()

4.内存

将计算结果作为内存中的表存储在内存中,用于小量数据的调试。

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("memory")
  .queryName("wordCount")
  .option("checkpointLocation", "hdfs://localhost:9000/kafka-ck1")
  .start()
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/307718.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号