Spark Streaming 是核心 Spark API 的扩展,它支持实时数据流的可扩展、高吞吐量、容错流处理。数据可以从许多来源(如 Kafka、Kinesis 或 TCP 套接字)获取,并且可以使用复杂的算法进行处理,这些算法由 map、reduce、join 和 window 等高级函数表示。最后,可以将处理后的数据推送到文件系统、数据库和实时仪表板。事实上,你可以在数据流上应用 Spark 的机器学习和图形处理算法。
在内部,它的工作原理如下。 Spark Streaming 接收实时输入的数据流,并将数据分成批处理,然后由 Spark 引擎处理以批处理生成最终的结果流。
Spark Streaming 提供了一种称为离散流或 DStream 的高级抽象,它表示连续的数据流。 DStreams 可以从来自 Kafka 和 Kinesis 等来源的输入数据流创建,也可以通过在其他 DStreams 上应用高级操作来创建。在内部,DStream 表示为一系列 RDD。
本指南向您展示如何开始使用 DStreams 编写 Spark Streaming 程序。您可以使用 Scala、Java 或 Python(在 Spark 1.2 中引入)编写 Spark Streaming 程序,所有这些都在本指南中进行了介绍。您将在本指南中找到可让您在不同语言的代码片段之间进行选择的选项卡。
注意:有一些 API 在 Python 中不同或不可用。在本指南中,您会发现标记 Python API 突出显示了这些差异。
示例1在我们详细介绍如何编写自己的 Spark Streaming 程序之前,让我们快速了解一个简单的 Spark Streaming 程序是什么样的。假设我们要计算从侦听 TCP 套接字的数据服务器接收的文本数据中的单词数。您需要做的就是如下
首先,我们将 Spark Streaming 类的名称和一些来自 StreamingContext 的隐式转换导入到我们的环境中,以便我们使用DStream 的api。 StreamingContext 是所有流功能的主要入口点。我们创建了一个具有两个执行线程的本地 StreamingContext,批处理间隔为 1 秒。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations .seconds(1));
使用创建的StreamingContext我们可以创建一个监听本地端口9999的TCP数据源
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Create a DStream that will connect to hostname:port, like localhost:9999 JavaReceiverInputDStreamlines = jssc.socketTextStream("localhost", 9999);
这行 DStream 表示将从数据服务器接收的数据流。此 DStream 中的每条记录都是一行文本。接下来,我们要按空格字符将行拆分为单词。
这行代码返回一个从数据服务器接收数据流的DStream ,
// Split each line into words
val words = lines.flatMap(_.split(" "))
flatMap 是一个一对多的 DStream 操作,它通过从源 DStream 中的每个记录生成多个新记录来创建一个新的 DStream。在这种情况下,每行将被拆分为多个单词,单词流表示为单词 DStream。接下来,我们要计算这些单词。
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print()
单词 DStream 进一步映射(一对一转换)到 (word, 1) 对的 DStream,然后将其聚合以获得每批数据中单词的频率。最后, wordCounts.print() 将打印一些每秒生成的计数。
请注意,当执行这些行时,Spark Streaming 仅设置它在启动时将执行的计算,还没有真正的处理开始。要在所有转换设置后开始处理,我们最后调用
ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate基本概念 Initializing Streaming Context
StreamContext由sparkConf创建
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))
其中appName 是程序在集群中运行的名称,master参数可以试spark集群或者k8s yarn集群的url。本地测试可以使用local[*] 其中还 * 代表的是核心数,可以自己制定也可以让程序自动计算核心数。我们也可以使用ssc获取sparkContext
核心数的设置需要根据实际情况来设定,可以参考性能调优部分文档
StreamingContext 对象也可以从现有的 SparkContext 对象创建。
import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1))
定义上下文后,您可以执行以下操作。
1 通过sparkStream创建数据源的Dstreams
2 通过算子来计算想要得到DStreams 以及最终落地的数据流
3 开始接收数据并使用streamingContext.start() 处理它。
4 使用streamingContext.awaitTermination() 等待处理停止(手动或由于任何错误)。
5 可以使用streamingContext.stop() 手动停止处理。
一旦上下文启动,就不能设置或添加新的流计算。
上下文一旦停止,就无法重新启动。
一个 JVM 中只能同时激活一个 StreamingContext。
StreamingContext 调用stop()同时也会停止SparkContext,如果你只想停止StreamingContext ,在调用stop方法的时候添加参数flase,
上整个上线文中只能存在一个StreamingContext ,如果你想创一个新的必须调用stop()停止上一个,然后使用sparkContext来创建。
Discretized Stream 或 DStream 是 Spark Streaming 提供的基本抽象。它表示一个连续的数据流,可以是从源接收到的输入数据流,也可以是通过转换输入流生成的处理后的数据流。在内部,DStream 由一系列连续的 RDD 表示,这是 Spark 对不可变的分布式数据集的抽象(更多详细信息请参阅 Spark 编程指南)。 DStream 中的每个 RDD 都包含来自某个区间的数据,如下图所示。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gWXlZo7J-1632886080105)(http://spark.apache.org/docs/latest/img/streaming-dstream.png)]
对 DStream 应用的任何操作都会转换为对底层 RDD 的操作。例如,在前面将行流转换为单词的示例中,对行 DStream 中的每个 RDD 应用 flatMap 操作以生成单词 DStream 的 RDD。这如下图所示。
这些底层 RDD 转换由 Spark 引擎计算。 DStream 算子操作隐藏了大部分细节,并为开发人员提供了更高级别的 API 以方便使用。这些操作将在后面的章节中详细讨论。
Input DStreams and Receivers(输入流和接收器)Input DStreams 是表示从流源接收的输入数据流的 DStreams。在上面例子中,lines 是一个输入 DStream,因为它代表了从 netcat 服务器接收到的数据流。每个输入 DStream(除了文件流,本节稍后讨论)都与一个 Receiver(Scala doc、Java doc)对象相关联,该对象从源接收数据并将其存储在 Spark 的内存中以供处理。
Spark Streaming 提供了两类内置的流媒体源。
基本来源:直接在 StreamingContext API 中可用的来源。示例:文件系统和套接字连接。
高级来源:Kafka、Kinesis 等来源可通过额外的实用程序类获得。这些需要链接额外的依赖项,如链接部分所述。
我们将在本节后面讨论每个类别中存在的一些来源。
请注意,如果您想在您的sparkStream应用程序中并行接收多个数据流,您可以创建多个输入 DStreams(在性能调优部分进一步讨论)。这将创建多个接收器,这些接收器将同时接收多个数据流。但请注意,Spark worker/executor 是一个长时间运行的任务,因此它占用了分配给 Spark Streaming 应用程序的核心之一。因此,重要的是要记住 Spark Streaming 应用程序需要分配足够的内核(或线程,如果在本地运行)来处理接收到的数据,以及运行接收器。
Points to remember在本地运行 Spark Streaming 程序时,不要使用“local”或“local[1]”作为主 URL。这两者都意味着只有一个线程将用于在本地运行任务。如果您使用基于接收器(例如套接字、Kafka 等)的输入 DStream,那么将使用单线程运行接收器,不留下任何线程来处理接收到的数据。因此,在本地运行时,始终使用“local[n]”作为主 URL,其中 n > 要运行的接收器数量(有关如何设置主服务器的信息,请参阅 Spark 属性)。
将逻辑扩展到在集群上运行,分配给 Spark Streaming 应用程序的内核数必须大于接收器数。否则系统只能接收数据,无法处理它。
Basic Sources我们已经查看了示例中的 ssc.socketTextStream(…),该示例根据通过 TCP 套接字连接接收的文本数据创建 DStream。除了套接字之外,StreamingContext API 还提供了读取文件创建 DStream 的方法。
File Streams为了从与 HDFS API 兼容的任何文件系统(即 HDFS、S3、NFS 等)上的文件读取数据,可以通过 StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass] 创建 DStream。
文件流不需要运行接收器,因此不需要分配任何内核来接收文件数据。
对于简单的文本文件,最简单的方法是 StreamingContext.textFileStream(dataDirectory)。dataDirectory代表本地文件路径或者hdfs文件路径
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
//文本文件 streamingContext.textFileStream(dataDirectory)How Directories are Monitored(监控目录)
Spark Streaming 将监视目录 (dataDirectory )并处理在该目录中创建的任何文件。
Using Object Stores as a source of data()1 可以监控一个简单的目录,比如“hdfs://namenode:8040/logs/”。直接在此类路径下的所有文件将在发现时进行处理。
2 可以提供 POSIX glob 模式,例如“hdfs://namenode:8040/logs/2017/”。在这里,DStream 将包含与模式匹配的目录中的所有文件。也就是说:它是一种目录模式,而不是目录中的文件。
3 所有文件必须采用相同的数据格式。
4 文件被视为基于其修改时间而非创建时间的时间段的一部分。
5 处理后,对当前窗口中的文件所做的更改不会导致重新读取该文件。即:忽略更新。
6 目录下的文件越多,扫描更改所需的时间就越长——即使没有文件被修改。
7 如果使用通配符标识目录,例如“hdfs://namenode:8040/logs/2016-”,则重命名整个目录以匹配路径会将目录添加到受监视目录列表中。只有修改时间在当前窗口内的目录中的文件才会包含在流中。
8 调用 FileSystem.setTimes() 来修复时间戳是一种让文件在以后的窗口中被拾取的方法,即使它的内容没有改变。
Streams based on Custom Receivers(自定义数据接收器)“完整”文件系统(例如 HDFS)倾向于在创建输出流后立即对其文件设置修改时间。当一个文件被打开时,甚至在数据被完全写入之前,它可能被包含在 DStream 中——之后在同一窗口内对文件的更新将被忽略。也就是说:可能会错过更改,并且从流中丢失了数据。
为确保在窗口中获取更改,请将文件写入不受监视的目录,然后在关闭输出流后立即将其重命名为目标目录。如果重命名的文件在其创建窗口期间出现在扫描的目标目录中,则将作为新数据。
相比之下,Amazon S3 和 Azure Storage 等对象存储通常具有较慢的重命名操作,因为数据实际上是复制的。此外,重命名的对象可能将 rename() 操作的时间作为其修改时间,因此可能不被视为原始创建时间暗示它们是的窗口的一部分。
需要针对目标对象存储进行仔细测试,以验证存储的时间戳行为是否与 Spark Streaming 预期的一致。直接写入目标目录可能是通过所选对象存储流式传输数据的适当策略。
有关此主题的更多详细信息,请参阅 Hadoop 文件系统规范。
可以使用通过自定义接收器接收的数据流创建 DStream。有关更多详细信息,请参阅自定义接收器指南。
Queue of RDDs as a Stream为了使用测试数据测试 Spark Streaming 应用程序,还可以使用 streamingContext.queueStream(queueOfRDDs) 创建基于 RDD 队列的 DStream。每个推入队列的 RDD 都会在 DStream 中被视为一批数据,并像流一样进行处理。
有关来自套接字和文件的流的更多详细信息,请参阅 StreamingContext for Scala、JavaStreamingContext for Java 和 StreamingContext for Python 中相关函数的 API 文档。
Advanced Sources(高级数据源)Python API 从 Spark 3.1.2 开始,Python API 中提供了 Kafka 和 Kinesis。
这类数据源需要与外部非 Spark 库接口,其中一些具有复杂的依赖关系(例如,Kafka)。因此,为了尽量减少与依赖项的版本冲突相关的问题,从这些源创建 DStream 的功能已移至单独的库中,必要时可以显式链接到这些库。
请注意,这些高级源在 Spark shell 中不可用,因此无法在 shell 中测试基于这些高级源的应用程序。如果您真的想在 Spark shell 中使用它们,则必须下载相应的 Maven 工件的 JAR 及其依赖项,并将其添加到类路径中。
其中一些高级来源如下。
Kafka:Spark Streaming 3.1.2 与 Kafka 代理版本 0.10 或更高版本兼容。有关更多详细信息,请参阅 Kafka 集成指南。
Kinesis:Spark Streaming 3.1.2 与 Kinesis Client Library 1.2.1 兼容。有关更多详细信息,请参阅 Kinesis 集成指南。
Custom Sources(自定义数据源)Python API 这在 Python 中尚不支持。
DStream 也可以从自定义数据源中创建。您所要做的就是实现一个用户定义的接收器(请参阅下一节以了解它是什么),它可以从自定义源接收数据并将其推送到 Spark。有关详细信息,请参阅自定义接收器指南。
Receiver Reliability(接收可靠性)保证其可靠性,可以选着可靠的数据源。如 (Kafka)允许确认传输的数据。从这些可靠的数据来源可以确认数据流是否处理数据,就可以确保不会因任何类型的故障而丢失数据。目前两种类型的接收器:
Transformations on DStreams(算子)1Reliable Receiver - 当sparkStreams接收到数据并消费成功后,向数据源发送消息确认,确保消息被消费。
2不可靠的接收器 - 不可靠的接收器不会向源发送确认。这可以用于不支持确认的源,或者甚至当人们不想或不需要进入确认的复杂性时的可靠源。
自定义接收器指南中讨论了如何编写可靠接收器的详细信息。
与 RDD 类似,允许转换修改来自输入 DStream 的数据。 DStreams 支持许多普通 Spark RDD 上可用的转换。一些常见的如下
| Transformation | Meaning |
| map(func) | 调用func循环处理DStream中的每个元素,返回处理后的DStream |
| flatMap(func) | 和map方法类型,每个输入项可以映射为0或者多个项 |
| filter(func) | 筛选出func返回true的Dstream |
| repartition(numPartitions) | 增加或者减少分区数,从而达到调整并行数 |
| union(otherStream) | 返回一个新的DStream他其中包括了源DStream和otherDStream的并集 |
| count() | 通过计算源 DStream 的每个 RDD 中的元素数量,返回一个新的单元素 RDD 的 DStream。 |
| reduce(func) | 通过使用函数 func(它接受两个参数并返回一个)聚合源 DStream 中的每个 RDD 中的元素,返回一个新的单元素 RDD 的 DStream。该函数应该是关联的和可交换的,以便它可以并行计算。 |
| countByValue() | 当在类型为 (K,V) 的元素的 DStream 上调用时,返回一个 (K, Long) 对的新 DStream,其中每个键的值是它在源 DStream 的每个 RDD 中的频率 |
| reduceByKey(func, [numTasks]) | 当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对 DStream,其中使用给定的 reduce 函数聚合每个键的值。 注意: 默认情况下,这使用 Spark 的默认并行任务数量(本地模式为 2,在集群模式下,数量由配置属性 spark.default.parallelism 决定)进行分组。您可以传递可选的 numTasks 参数来设置不同数量的任务。 |
| join(otherStream, [numTasks]) | 当在 (K, V) 和 (K, W) 对的两个 DStream 上调用时,返回一个新的 (K, (V, W)) 对的 DStream,其中包含每个键的所有元素对。 |
| cogroup(otherStream, [numTasks]) | 当在 (K, V) 和 (K, W) 对的 DStream 上调用时,返回 (K, Seq[V], Seq[W]) 元组的新 DStream。 |
| transform(func) | 通过func将 RDD-to-RDD 函数应用于源 DStream 的每个 RDD,返回一个新的 DStream。这可用于在 DStream 上执行任意 RDD 操作 |
| updateStateByKey(func) | 1、为Spark Streaming中每一个Key维护一份state状态,state类型可以是任意类型的, 可以是一个自定义的对象,那么更新函数也可以是自定义的。 2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新。 注意 使用到updateStateByKey要开启checkpoint机制和功能。 多久会将内存中的数据写入到磁盘一份? 如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。如果batchInterval设置的时间大于10秒,那么就会batchInterval时间间隔写入磁盘一份 |
updateStateByKey 操作允许您保持任意状态,同时使用新信息不断更新它。要使用它,您必须执行两个步骤。
定义状态 - 状态可以是任意数据类型。
定义状态更新函数 - 使用函数指定如何使用先前状态和输入流中的新值更新状态。
在每个批次中,Spark 都会对所有现有的键应用状态更新功能,无论它们是否有新的批次数据。如果更新函数返回 None 则键值对将被消除。
让我们用一个例子来说明这一点。假设您要维护在文本数据流中看到的每个单词的运行计数。这里,运行计数是状态,它是一个整数。我们将更新函数定义为:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // 将新值与先前的运行计数相加以获得新计数
Some(newCount)
}
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
将为每个单词调用更新函数,newValues 具有 1 的序列(来自 (word, 1) 对),而 runningCount 具有以前的计数值。
请注意,使用 updateStateByKey 必须要配置检查点目录,这将在检查点部分详细讨论。
Transform Operation(转换算子)转换操作(连同它的变体,如transformWith)允许将任意的RDD-to-RDD函数应用于DStream。它可用于应用任何未在 DStream API 中公开的 RDD 操作。例如,将数据流中的每个批次与另一个数据集连接的功能并未直接在 DStream API 中公开。但是,您可以轻松地使用转换来执行此操作。这实现了非常强大的可能性。例如,可以通过将输入数据流与其他预先计算好的DStream信息(也可能由 Spark 生成)连接,然后根据它进行过滤来进行实时数据清理。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // 加入以及处理好的数据流进行过滤
...
}
请注意,提供的函数在每个批处理间隔中都会被调用。这允许你改变RDD,即RDD操作,改变分区数,增加广播变量等可以在batch之间改变。
Window OperationsSpark Streaming 还提供窗口计算,允许您在滑动窗口上进行数据转换。下图说明了这个滑动窗口。
如图所示,每次窗口滑过一个源 DStream 时,落入窗口内的源 RDD 被组合并操作以产生窗口化 DStream 的 RDD。在这种特定情况下,该操作应用于最后 3 个时间单位的数据,并滑动 2 个时间单位。这说明任何窗口操作都需要指定两个参数。
窗口长度 - 窗口的持续时间(图中 3)。
滑动间隔 - 执行窗口操作的间隔(图中为 2)。
这两个参数必须是源DStream的batch间隔的倍数(图中为1)。
让我们用一个例子来说明窗口操作。假设您想通过每 10 秒生成过去 30 秒数据的字数来扩展前面的示例。为此,我们需要对过去 30 秒的数据 (word, 1) 对的 DStream 进行reduceByKey 操作。这里可以使用 reduceByKeyAndWindow 操作完成的。
// 每10秒统计30s的单词出现频率 val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
下面介绍一些常用的窗口函数
| Transformation | Meaning |
|---|---|
| window(windowLength, slideInterval) | 返回一个新的 DStream,它是根据源 DStream 的窗口进行批次计算的。 |
| countByWindow(windowLength, slideInterval) | 返回流中元素的滑动窗口计数。 |
| reduceByWindow(func, windowLength, slideInterval) | 返回一个新的单元素流,它是通过使用 func 在滑动间隔内聚合流中的元素而创建的。该函数应该是关联的和可交换的,以便它可以被正确地并行计算。 |
| reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对 DStream,其中每个键的值使用给定的 reduce 函数 func 在滑动窗口中按批次聚合。 注意: 默认情况下,这使用 Spark 的默认并行任务数量(本地模式为 2,在集群模式下,数量由配置属性 spark.default.parallelism 决定)进行分组。您可以传递可选的 numTasks 参数来设置不同数量的任务。 |
| reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | 上面 reduceByKeyAndWindow() 的一个更有效的版本,其中每个窗口的reduce 值是使用前一个窗口的reduce 值递增计算的。这是通过减少进入滑动窗口的新数据,并“逆减少”离开窗口的旧数据来完成的。一个例子是在窗口滑动时“添加”和“减去”键的计数。但是,它只适用于“可逆归约函数”,即那些具有相应“逆归约”函数(作为参数invFunc)的归约函数。就像在 reduceByKeyAndWindow 中一样,reduce 任务的数量可以通过一个可选参数进行配置。请注意,必须启用 checkpointing 才能使用此操作。 |
| countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, Long) 对 DStream,其中每个键的值是其在滑动窗口内的频率。就像在 reduceByKeyAndWindow 中一样,reduce 任务的数量可以通过一个可选参数进行配置。 |
最后,值得强调的是在 Spark Streaming 中执行不同类型的连接是多么容易。
Design Patterns for using foreachRDD使用foreachRDD必须落地数据,不然会被程序抛弃
默认情况下,输出操作一次执行一次。它们按照在应用程序中定义的顺序执行
Dataframe and SQL Operations(sql操作)在saprkStreams中您可以轻松地使用 Dataframes 和 SQL对流数据进行 操作。在使用前你必须使用当前StreamingContext 中的SparkContext 创建一个SparkSession,下面对word count示例使用sql方法改造,使用Dataframes 和sql来统计单次出现评率,其中每个RDD都会转换成一个Dataframe,并把他们注册为一个临时表,然后使用sql进行查询操作
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// 获取一个单例的sparksession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to Dataframe
val wordsDataframe = rdd.toDF("word")
// Create a temporary view
wordsDataframe.createOrReplaceTempView("words")
// Do word count on Dataframe using SQL and print it
val wordCountsDataframe =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataframe.show()
}
您还可以对在来自不同线程的流数据上定义的表运行 SQL 查询(即与正在运行的 StreamingContext 异步)。只需确保将 StreamingContext 设置为记住足够数量的流数据,以便查询可以运行。否则,不知道任何异步 SQL 查询的 StreamingContext 将在查询完成之前删除旧的流数据。例如,如果您想查询最后一批,但您的查询可能需要 5 分钟才能运行,则调用 streamingContext.remember(Minutes(5)) (在 Scala 中,或其他语言中的等效项)。
请参阅 Dataframes 和 SQL 指南以了解有关 Dataframe 的更多信息。
Caching / Persistence与 RDD 类似,DStreams 也允许开发人员将流的数据保存在内存中。也就是说,在 DStream 上使用 persist() 方法将自动将该 DStream 的每个 RDD 持久化在内存中。如果 DStream 中的数据将被多次计算(例如,对同一数据进行多次操作),这将非常有用。对于像reduceByWindow 和reduceByKeyAndWindow 这样的基于窗口的操作和像updateStateByKey 这样的基于状态的操作,这是隐式的。因此,由基于窗口的操作生成的 DStream 会自动持久保存在内存中,而无需开发人员调用 persist()。
对于通过网络接收数据的输入流(如 Kafka、sockets 等),默认的持久化级别设置为将数据复制到两个节点以实现容错。
请注意,与 RDD 不同,DStreams 的默认持久化级别将数据序列化在内存中。这将在性能调优部分进一步讨论。有关不同持久性级别的更多信息,请参阅 Spark 编程指南。
Checkpointing(检查点)流应用程序必须 7*24全天候运行,因此必须能够应对与应用程序逻辑无关的故障(例如,系统故障、JVM 崩溃等)。为了使这成为可能,Spark Streaming 需要将足够的信息检查点到容错存储系统,以便它可以从故障中恢复。检查点有两种类型的数据。
-
元数据检查点 - 将定义流计算的信息保存到容错存储,如 HDFS。这用于从运行流应用程序驱动程序的节点故障中恢复(稍后详细讨论)。元数据包括:
配置 -用于创建流应用程序的配置。
DStream 操作 - 定义流应用程序的一组 DStream 操作。
不完整的批次 - 作业已排队但尚未完成的批次。
-
数据检查点 - 将生成的 RDD 保存到可靠的存储中。这在一些跨多个批次组合数据的有状态转换中是必要的。在这样的转换中,生成的 RDD 依赖于之前批次的 RDD,这导致依赖链的长度随时间不断增加。为了避免恢复时间的这种无限增加(与依赖链成比例),有状态转换的中间 RDD 会定期检查点到可靠存储(例如 HDFS)以切断依赖链。
总而言之,元数据检查点主要用于从驱动程序故障中恢复,而如果使用有状态转换,即使对于基本功能,数据或 RDD 检查点也是必要的。
必须为具有以下任何要求的应用程序启用检查点:
-
有状态转换的使用 - 如果在应用程序中使用 updateStateByKey 或 reduceByKeyAndWindow(具有反函数),则必须提供检查点目录以允许定期 RDD 检查点。
-
从运行应用程序的驱动程序的故障中恢复 - 元数据检查点用于恢复进度信息。
请注意,没有上述状态转换的简单流应用程序可以在不启用检查点的情况下运行。在这种情况下,驱动程序故障的恢复也将是部分的(一些已接收但未处理的数据可能会丢失)。这通常是可以接受的,许多人以这种方式运行 Spark Streaming 应用程序。对非 Hadoop 环境的支持有望在未来得到改善。
How to configure Checkpointing可以通过在容错、可靠的文件系统(例如 HDFS、S3 等)中设置一个目录来启用检查点,检查点信息将保存到该目录中。这是通过使用 streamingContext.checkpoint(checkpointDirectory) 来完成的。这将允许您使用上述有状态转换。此外,如果您想让应用程序从驱动程序故障中恢复,您应该重写您的流应用程序以具有以下行为
- 当程序第一次启动时,它会创建一个新的 StreamingContext,设置所有的流,然后调用 start()。
- 当程序在失败后重新启动时,它会从检查点目录中的检查点数据重新创建一个 StreamingContext
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
如果 checkpointDirectory 存在,则将从检查点数据重新创建上下文。如果该目录不存在(即第一次运行),则函数 functionToCreateContext 将被调用以创建新的上下文并设置 DStreams。请参阅 Scala 示例 RecoverableNetworkWordCount。此示例将网络数据的字数附加到文件中。
除了使用 getOrCreate 之外,还需要确保驱动程序进程在失败时自动重新启动。这只能由用于运行应用程序的部署基础结构来完成。这将在部署部分进一步讨论。
请注意,RDD 的检查点会产生保存到可靠存储的成本。这可能会导致 RDD 获得检查点的那些批次的处理时间增加。因此,需要仔细设置检查点的间隔。在小批量(比如 1 秒)下,每批检查点可能会显着降低操作吞吐量。相反,检查点太少会导致谱系和任务大小增加,这可能会产生不利影响。对于需要 RDD 检查点的有状态转换,默认间隔是批处理间隔的倍数,至少为 10 秒。它可以通过使用 dstream.checkpoint(checkpointInterval) 来设置。通常,DStream 的 5 - 10 个滑动间隔的检查点间隔是一个很好的尝试设置。
Accumulators, Broadcast Variables, and Checkpoints(累加器、广播变量和检查点)无法从 Spark Streaming 中的检查点恢复累加器和广播变量。如果您启用检查点并使用累加器或广播变量,则必须为累加器和广播变量创建延迟实例化的单例实例,以便在驱动程序失败后重新启动后可以重新实例化它们。这在以下示例中显示。
object WordExcludeList {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordExcludeList = Seq("a", "b", "c")
instance = sc.broadcast(wordExcludeList)
}
}
}
instance
}
}
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("DroppedWordsCounter")
}
}
}
instance
}
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the excludeList Broadcast
val excludeList = WordExcludeList.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use excludeList to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (excludeList.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})
Deploying Applications(部署引用程序)
本节讨论部署 Spark Streaming 应用程序的步骤。
Requirements(要求)要运行 Spark Streaming 应用程序,您需要具备以下条件:
-
Cluster with a cluster manager - 这是任何 Spark 应用程序的一般要求,并在部署指南中详细讨论。
-
打包应用程序 JAR - 您必须将流式应用程序编译为 JAR。如果您使用 spark-submit 来启动应用程序,那么您将不需要在 JAR 中提供 Spark 和 Spark Streaming。但是,如果您的应用程序使用高级源(例如 Kafka),那么您必须将它们链接到的额外工件及其依赖项打包在用于部署应用程序的 JAR 中。例如,使用 KafkaUtils 的应用程序必须在应用程序 JAR 中包含 spark-streaming-kafka-0-10_2.12 及其所有传递依赖项。
-
为执行器配置足够的内存 - 由于接收到的数据必须存储在内存中,执行器必须配置有足够的内存来保存接收到的数据。请注意,如果您正在进行 10 分钟的窗口操作,则系统必须在内存中保留至少最近 10 分钟的数据。因此,应用程序的内存要求取决于其中使用的操作。
-
配置检查点 - 如果流应用需要它,那么必须将 Hadoop API 兼容容错存储中的一个目录(例如 HDFS、S3 等)配置为检查点目录,并且流应用以检查点信息可以通过的方式编写用于故障恢复。有关更多详细信息,请参阅检查点部分。
-
配置应用程序驱动程序的自动重启——为了从驱动程序故障中自动恢复,用于运行流应用程序的部署基础设施必须监控驱动程序进程并在它失败时重新启动驱动程序。不同的集群管理器有不同的工具来实现这一点。
- Spark Standalone - Spark 应用程序驱动程序可以提交在 Spark Standalone 集群中运行(参见集群部署模式),即应用程序驱动程序本身运行在其中一个工作节点上。此外,可以指示独立集群管理器监督驱动程序,并在驱动程序由于非零退出代码或由于运行驱动程序的节点失败而失败时重新启动它。有关更多详细信息,请参阅 Spark Standalone 指南中的集群模式和监督。
- YARN - Yarn 支持类似的机制来自动重新启动应用程序。有关更多详细信息,请参阅 YARN 文档。
- Mesos - Marathon 已被用来通过 Mesos 实现这一点。
-
配置预写日志 - 从 Spark 1.2 开始,我们引入了预写日志以实现强大的容错保证。如果启用,从接收器接收的所有数据都会写入配置检查点目录中的预写日志。这可以防止驱动程序恢复时丢失数据,从而确保零数据丢失(在容错语义部分详细讨论)。这可以通过将配置参数 spark.streaming.receiver.writeAheadLog.enable 设置为 true 来启用。然而,这些更强的语义可能以单个接收器的接收吞吐量为代价。这可以通过并行运行更多接收器以增加总吞吐量来纠正。此外,建议在启用预写日志时禁用 Spark 中接收数据的复制,因为日志已存储在复制的存储系统中。这可以通过将输入流的存储级别设置为 StorageLevel.MEMORY_AND_DISK_SER 来完成。在使用 S3(或任何不支持刷新的文件系统)进行预写日志时,请记住启用 spark.streaming.driver.writeAheadLog.closeFileAfterWrite 和 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。有关更多详细信息,请参阅 Spark 流配置。请注意,当启用 I/O 加密时,Spark 不会加密写入预写日志的数据。如果需要对预写日志数据进行加密,则应将其存储在本机支持加密的文件系统中。
-
设置最大接收速率 - 如果集群资源不足以让流应用程序以接收数据的速度处理数据,则可以通过设置以记录/秒为单位的最大速率限制来限制接收器的速率。请参阅接收器的配置参数 spark.streaming.receiver.maxRate 和直接 Kafka 方法的 spark.streaming.kafka.maxRatePerPartition。在 Spark 1.5 中,我们引入了一个称为背压的功能,无需设置此速率限制,因为 Spark Streaming 会自动计算出速率限制并在处理条件发生变化时动态调整它们。可以通过将配置参数 spark.streaming.backpressure.enabled 设置为 true 来启用此背压。
如果需要使用新的应用程序代码升级正在运行的 Spark Streaming 应用程序,则有两种可能的机制。
-
升级后的 Spark Streaming 应用程序启动并与现有应用程序并行运行。一旦新的(接收到与旧的相同的数据)预热并准备好迎接黄金时段,就可以关闭旧的。请注意,这可以用于支持将数据发送到两个目的地的数据源(即较早的和升级的应用程序)。
-
现有应用程序正常关闭(请参阅 StreamingContext.stop(…) 或 JavaStreamingContext.stop(…) 以获取正常关闭选项),以确保在关闭之前完全处理已接收的数据。然后可以启动升级的应用程序,它将从较早的应用程序停止的同一点开始处理。请注意,这只能使用支持源端缓冲的输入源(如 Kafka)来完成,因为数据需要在前一个应用程序关闭且升级的应用程序尚未启动时进行缓冲。并且无法从升级前代码的较早检查点信息重新启动。检查点信息本质上包含序列化的 Scala/Java/Python 对象,尝试使用新的、修改过的类反序列化对象可能会导致错误。在这种情况下,要么使用不同的检查点目录启动升级后的应用程序,要么删除之前的检查点目录。
除了 Spark 的监控功能之外,还有其他特定于 Spark Streaming 的功能。使用 StreamingContext 时,Spark Web UI 会显示一个额外的 Streaming 选项卡,其中显示有关正在运行的接收器(接收器是否处于活动状态、接收到的记录数、接收器错误等)和已完成批次(批处理时间、排队延迟等)的统计信息.)这可用于监视流应用程序的进度。
Web UI 中的以下两个指标特别重要:
- 处理时间 - 处理每批数据的时间。
- 调度延迟 - 一个批次在队列中等待前一批处理完成的时间。
如果批处理时间始终大于批处理间隔和/或排队延迟不断增加,则表明系统无法像生成批处理一样快地处理批处理,并且正在落后。在这种情况下,请考虑减少批处理时间。
也可以使用 StreamingListener 接口监控 Spark Streaming 程序的进度,该接口允许您获取接收器状态和处理时间。请注意,这是一个开发人员 API,将来可能会对其进行改进(即报告更多信息)。
Performance Tuning(性能调优)从集群上的 Spark Streaming 应用程序中获得最佳性能需要进行一些调整。本节介绍了许多可以调整以提高应用程序性能的参数和配置。在高层次上,您需要考虑两件事:
-
有效利用集群资源,减少每批数据的处理时间。
-
设置正确的批次大小,以便数据批次可以在接收时尽快处理(即数据处理跟上数据摄取)。
可以在 Spark 中进行许多优化以最小化每个批次的处理时间。这些已在
中详细讨论。本节重点介绍一些最重要的内容。
Level of Parallelism in Data Receiving(并行接收数据)通过网络(如 Kafka、socket 等)接收数据需要将数据反序列化并存储在 Spark 中。如果数据接收成为系统的瓶颈,则考虑并行化数据接收。请注意,每个输入 DStream 都会创建一个接收器(在工作机器上运行),用于接收单个数据流。因此,可以通过创建多个输入 DStream 并将它们配置为从源接收数据流的不同分区来实现接收多个数据流。例如,接收两个主题数据的单个 Kafka 输入 DStream 可以拆分为两个 Kafka 输入流,每个输入流仅接收一个主题。这将运行两个接收器,允许并行接收数据,从而提高整体吞吐量。这些多个 DStream 可以结合在一起以创建单个 DStream。然后可以将应用于单个输入 DStream 的转换应用于统一流。这是如下完成的。
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
另一个应该考虑的参数是接收方的块间隔,它由配置参数 spark.streaming.blockInterval 决定。对于大多数接收器,接收到的数据在存储在 Spark 的内存中之前被合并成数据块。每批中的块数决定了将用于在类似map 算子来处理接收到的数据的任务数。每批每个接收器的任务数约为(批间隔/块间隔)。例如,200 ms 的块间隔将每 2 秒创建 10 个任务批次。如果任务数量太少(即小于每台机器的核心数量),那么效率会很低,因为所有可用的核心都不会用于处理数据。要增加给定批处理间隔的任务数,请减少块间隔。但是推荐的block interval最小值为50ms左右,低于这个值可能会导致任务启动开销问题。
使用多个输入流/接收器接收数据的另一种方法是显式重新分区输入数据流(使用 inputStream.repartition())。在进一步处理之前,这会将接收到的批次数据分布在集群中指定数量的机器上。
直接流请参考 Spark Streaming + Kafka 集成指南
Level of Parallelism in Data Processing(数据处理中的并行级别)如果在计算的任何阶段使用的并行任务数量不够多,则集群资源可能未得到充分利用。例如,对于reduceByKey 和reduceByKeyAndWindow 等分布式reduce 操作,并行任务的默认数量由spark.default.parallelism 配置属性控制。您可以将并行级别作为参数传递(请参阅 PairDStreamFunctions 文档),或设置 spark.default.parallelism 配置属性来更改默认值。
Data Serialization(数据序列化)通过调整序列化格式可以减少数据序列化的开销。在流的情况下,有两种类型的数据正在被序列化。
-
输入数据:默认情况下,通过 Receivers 接收到的输入数据以 StorageLevel.MEMORY_AND_DISK_SER_2 存储在执行器的内存中。也就是说,数据被序列化为字节以减少 GC 开销,并复制以容忍执行程序故障。此外,数据首先保存在内存中,只有在内存不足以容纳流式计算所需的所有输入数据时才会溢出到磁盘。这种序列化显然有开销——接收方必须反序列化接收到的数据并使用 Spark 的序列化格式重新序列化它。
-
流操作生成的持久化 RDD:流计算生成的 RDD 可以持久化在内存中。例如,窗口操作将数据保存在内存中,因为它们将被多次处理。然而,与 Spark Core 默认的 StorageLevel.MEMORY_onLY 不同,由流计算生成的持久化 RDD 默认使用 StorageLevel.MEMORY_ONLY_SER(即序列化)持久化,以最小化 GC 开销。
在这两种情况下,使用 Kryo 序列化都可以减少 CPU 和内存开销。有关更多详细信息,请参阅 Spark 调优指南。对于 Kryo,请考虑注册自定义类,并禁用对象引用跟踪(请参阅配置指南中的 Kryo 相关配置)。
在流应用程序需要保留的数据量不大的特定情况下,将数据(两种类型)作为反序列化对象持久化而不产生过多的 GC 开销可能是可行的。例如,如果您使用几秒钟的批处理间隔并且没有窗口操作,那么您可以尝试通过相应地显式设置存储级别来禁用持久数据中的序列化。这将减少由于序列化而导致的 CPU 开销,从而潜在地提高性能而不会产生过多的 GC 开销。
Task Launching Overheads(任务启动开销)如果每秒启动的任务数量很高(比如每秒 50 个或更多),那么向执行器发送任务的开销可能很大,并且很难实现亚秒级延迟。可以通过以下更改减少开销:
-
Execution mode:在在Standalone mode或coarse-grained Mesos mode 模式下运行 Spark比在fine-grained Mesos mode 模式下有启动时间会更少,有关更多详细信息,请参阅在 Mesos 上运行指南。
这些更改可能会将批处理时间减少 100 毫秒,从而使亚秒级的批处理大小可行。
为了使在集群上运行的 Spark Streaming 应用程序稳定,系统应该能够在接收数据时尽快处理数据。换句话说,批量数据的处理速度应与生成它们的速度一样快。可以通过在流式 Web UI 中监视处理时间来确定应用程序是否如此,其中批处理时间应小于批处理间隔。
根据流计算的性质,所使用的批处理间隔可能会对应用程序在一组固定集群资源上维持的数据速率产生重大影响。例如,让我们考虑较早的 WordCountNetwork 示例。对于特定的数据速率,系统可能能够每 2 秒(即 2 秒的批处理间隔)跟上报告字数,但不是每 500 毫秒。因此,需要设置批处理间隔,以便能够维持生产中的预期数据速率。
确定应用程序正确批处理大小的一个好方法是使用保守的批处理间隔(例如 5-10 秒)和低数据速率对其进行测试。要验证系统是否能够跟上数据速率,您可以检查每个处理批次所经历的端到端延迟的值(在 Spark 驱动程序 log4j 日志中查找“总延迟”,或使用StreamingListener 接口)。如果延迟保持与批量大小相当,则系统是稳定的。否则,如果延迟不断增加,则表示系统跟不上,因此不稳定。一旦您有了稳定配置的想法,您就可以尝试增加数据速率和/或减少批量大小。请注意,由于临时数据速率增加而导致的延迟瞬间增加可能没问题,只要延迟减少回较低的值(即小于批量大小)。
Memory Tuning(内存调优)调优指南中已经详细讨论了如何调优 Spark 应用程序的内存使用和 GC 行为。强烈建议您阅读该内容。在本节中,我们将专门在 Spark Streaming 应用程序的上下文中讨论一些调整参数。
Spark Streaming 应用程序所需的集群内存量在很大程度上取决于所使用的转换类型。例如,如果您想对最近 10 分钟的数据使用窗口操作,那么您的集群应该有足够的内存来在内存中保存 10 分钟的数据。或者,如果您想使用具有大量键的 updateStateByKey,那么所需的内存会很高。反之,如果要做简单的map-filter-store操作,那么所需的内存就会很低。
一般情况下,由于通过接收器接收到的数据是用StorageLevel.MEMORY_AND_DISK_SER_2存储的,内存放不下的数据会溢出到磁盘上。这可能会降低流式应用程序的性能,因此建议您根据流式应用程序的需要提供足够的内存。最好尝试在小范围内查看内存使用情况并进行相应估计。
内存调优的另一个方面是垃圾回收。对于需要低延迟的流式应用来说,JVM 垃圾回收导致的处理时间增加是不可取的
有几个参数可以帮助您调整内存使用和 GC 开销:
-
DStreams 的持久化级别:如前面数据序列化部分所述,默认情况下输入数据和 RDD 作为序列化字节持久化。与反序列化持久化相比,这减少了内存使用和 GC 开销。启用 Kryo 序列化可进一步减少序列化大小和内存使用量。可以通过压缩(参见 Spark 配置 spark.rdd.compress)以 CPU 时间为代价进一步减少内存使用。
-
清除旧数据:默认情况下,所有输入数据和 DStream 转换生成的持久化 RDD 都会自动清除。 Spark Streaming 根据使用的转换决定何时清除数据。例如,如果您使用 10 分钟的窗口操作,那么 Spark Streaming 将保留最近 10 分钟的数据,并主动丢弃旧数据。通过设置streamingContext.remember,可以将数据保留更长的时间(例如交互式查询旧数据)。
-
CMS 垃圾收集器:强烈建议使用并发标记和清除 GC,以减少与 GC 相关的处理时间。尽管已知并发 GC 会降低系统的整体处理吞吐量,但仍建议使用它来实现更一致的批处理时间。确保在驱动程序(使用 spark-submit 中的 --driver-java-options)和执行程序(使用 Spark 配置 spark.executor.extraJavaOptions)上都设置了 CMS GC。
-
其他技巧:为了进一步减少 GC 开销,这里有一些更多的技巧可以尝试。
important points to remember:
- 使用 OFF_HEAP 存储级别持久化 RDD。在 Spark 编程指南中查看更多详细信息。
- 使用更多具有较小堆大小的执行程序。这将降低每个 JVM 堆内的 GC 压力。
-
DStream 与单个接收器相关联。为了实现读取并行性,需要创建多个接收器,即多个 DStream。接收器在执行器中运行。它占据一个核心。确保在预订接收器插槽后有足够的内核进行处理,即 spark.cores.max 应考虑接收器插槽。接收者以循环方式分配给执行者。
-
当从数据源接收数据时,接收器创建数据块。每 blockInterval 毫秒生成一个新的数据块。在batchInterval 期间创建了N 个数据块,其中N = batchInterval/blockInterval。这些块由当前执行器的 BlockManager 分发给其他执行器的块管理器。之后,在驱动程序上运行的网络输入跟踪器会被告知块位置以供进一步处理。
-
在驱动程序上为 batchInterval 期间创建的块创建一个 RDD。 batchInterval 期间生成的块是 RDD 的分区。每个分区都是 spark 中的一个任务。 blockInterval== batchinterval 意味着创建了一个分区,并且可能在本地进行处理。
-
块上的映射任务在具有块的执行器(一个接收块,另一个块被复制的地方)中处理,无论块间隔如何,除非非本地调度开始。具有更大的块间隔意味着更大的块。 spark.locality.wait 的高值会增加在本地节点上处理块的机会。需要在这两个参数之间找到平衡,以确保在本地处理较大的块。
-
您可以通过调用 inputDstream.repartition(n) 来定义分区数,而不是依赖于 batchInterval 和 blockInterval。这会随机重新排列 RDD 中的数据以创建 n 个分区。是的,为了更大的并行性。虽然是以reshuffles 为代价的。一个 RDD 的处理是由 driver 的 jobscheduler 作为作业来调度的。在给定的时间点,只有一项作业处于活动状态。因此,如果一个作业正在执行,其他作业将排队。
-
如果您有两个 dstream,则会形成两个 RDD,并且会创建两个作业,这些作业将一个接一个地安排。为避免这种情况,您可以合并两个 dstream。这将确保为 dstream 的两个 RDD 形成单个 unionRDD。这个 unionRDD 然后被认为是一个单一的工作。但是,RDD 的分区不受影响。
-
如果批处理时间超过 batchinterval,那么显然接收器的内存将开始填满并最终抛出异常(很可能是 BlockNotFoundException)。目前,没有办法暂停接收器。使用 SparkConf 配置 spark.streaming.receiver.maxRate,可以限制接收器的速率。



