栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark Structured Streaming总体实现流程

Spark Structured Streaming总体实现流程

Spark Structured Streaming总体实现流程

我们根据一个简单的例子来查看一下Spark Structured Streaming的总体实现流程。

一个简单的Structured Streaming的例子
import pyspark.sql.functions as F

lines = spark 
    .readStream 
    .format("socket") 
    .option("host", "localhost") 
    .option("port", 10002) 
    .load()

# 处理数据
words = lines.select(
   F.explode(
       F.split(lines.value, " ")
   ).alias("word")
)

# 单词计数
wordCounts = words.groupBy("word").count()

# 打印到终端
query = wordCounts 
    .writeStream 
    .outputMode("complete") 
    .format("console") 
    .start()
总体流程 1.创建DataStreamReader

在sparksession中调用readstream函数,这样这样可以得到一个DataStreamReader对象。

def readStream: DataStreamReader = new DataStreamReader(self)

(1)调用DataStreamReader.load()函数开始获取数据源的数据,并把数据保存成Dataframe。

(2)load()函数调用DataSource.lookupDataSource来获取数据源的类对象,并通过返回的类对象来创建数据源对象。可以支持多种数据源对象,比如:kafka、各种文件格式orc等。要注意,socket只是一个实验性质的实现,不能用于生产环境。

(3)根据sparksession的选项(微批,还是持续流(默认))。若是微批,则调用对应的MicroBatchReadSupport实现类的createMicroBatchReader来创建数据源读取对象,若是kafka则会创建:KafkaMicroBatchReader对象。

(4)根据创建的对象,来创建Dataframe:Dataset.ofRows(…)

此时实际上是创建了一个查询计划,后面的各种操作都会基于该执行计划来进行计划的添加。

2.启动流查询:start()

启动流的查询和处理是在dataset被创建完成后进行的写数据流中进行的,其实就是调用:Dataset#writeStream函数。该函数返回一个DataStreamWriter对象。

当调用DataStreamWriter#start()函数时,就开始执行流数据的读取和处理。start()函数会根据source的不同而进行不同的处理。source的类型主要有:

  • memory
  • foreach
  • foreachBatch
  • 非以上三种类型(一般模式)

start()函数的总体流程如下:

当调用DataStreamWriter#start()时会根据以创建的dataframe,调用startQuery开始流数据的获取和处理。

(1)创建数据源读取的对象。根据不同的模式创建的流读取对象也不同。比如:微批的kafka数据读取类为:KafkaMicroBatchReader等等。

(2)读取sparksession的配置选项

(3)调用df.sparkSession.sessionState.streamingQueryManager.startQuery()开始流数据的读取和处理。

3.streamingQueryManager.startQuery()的总体处理逻辑

(1)创建一个query = StreamingQueryWrapper(MicroBatchExecution…)对象,若是continuious模式,会创建StreamingQueryWrapper(new ContinuousExecution())对象。

(2)调用query.streamingQuery.start(),来启动数据处理。

(3)启动QueryExecutionThread线程,运行runStream()函数,在该函数中会调用runActivatedStream函数。

(4)runActivatedStream有两种实现方式,一种是微批:此时运行MicroBatchExecution#runActivatedStream()函数;一种是连续流:执行ContinuousExecution#runActivatedStream函数。

(5)调用ProcessingTimeExecutor#execute函数,该函数会进入一个while(true){…}的循环中,并间隔一定的毫秒数,运行 triggerHandler函数。

(6)triggerHandler函数会创建一个Dataset,并调用Dataset#collect()来触发计算Dataset的查询计划的执行。collectI()函数只会触发任务的执行,不会把实际的数据获取到driver端。

小结

​ 本文分析了Spark Structured Streaming的总体实现流程。通过本文的分析可以对SSS的代码实现有一个大致的脉络,可以按照这个框架再去细看某一个实现的部分。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/581530.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号