有时我们可能根据某些固定的数据集对流进行过滤、修改、扩充等操作。此时可以通过使用静态dataframe和流dataframe进行join操作来完成。这里的静态dataframe是指常规的dataframe,也就是从某个固定位置一次性读取的数据集,比如:hive或hdfs。
允许流和静态dataframe进行操作,让spark structured streaming对流处理的功能变得更加灵活,可扩展性更强。
场景架构这种场景的架构如下图所示:
[spark structured streaming]----->[transform]- —>[output] -----> [kafka]
|/
[mysql/hive/hdfs]
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
from __future__ import print_function
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import window
def func1(df):
#
wc = df.groupBy(window(df.timestamp, "10 seconds", "5 seconds"), df.word).count()
wc.write.mode("append").format("json").save("/tmp/sparktest/json/")
def func2(df):
#
wc2 = df.groupBy('word').count()
wc2.write.mode("append").format("csv").save("/tmp/sparktest/csv/")
def func3(df):
# 通过读取外部数据,创建静态的dataframe。
# 用来与流数据进行交互。
list_data = [("this", "1"), ("that", "2")]
schema = ["word", "col2"]
df1 = spark.createDataframe(list_data, schema)
# 计算流数据得到一个dataframe
wc = df.groupBy(window(df.timestamp, "10 seconds", "5 seconds"), df.word).count()
# 使用静态df和流df进行join操作
res_df = wc.join(df1, "word", "left_outer")
# sink
res_df.write.mode("append").format("json").save("/tmp/sparktest/joinres/")
func_list = [func1, func2, func3]
# 主循环
def foreach_batch_function(df, epoch_id):
df.persist()
# Transform and write batchDF
for f in func_list:
f(df)
df.unpersist()
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: structured_network_wordcount.py ", file=sys.stderr)
sys.exit(-1)
host = sys.argv[1]
port = int(sys.argv[2])
spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
lines = spark
.readStream
.format('socket')
.option('host', host)
.option('port', port)
.option('includeTimestamp', 'true')
.load()
# Split the lines into words
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# multiple action
query = words.writeStream
.foreachBatch(foreach_batch_function).start()
query.awaitTermination()
小结
通过与静态 的dataframe进行join,让spark structured streaming的可扩展性大大增加。让其适用场景也大大增多。



