栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark Structured Streaming实战--流df和静态df进行join操作

Spark Structured Streaming实战--流df和静态df进行join操作

有时我们可能根据某些固定的数据集对流进行过滤、修改、扩充等操作。此时可以通过使用静态dataframe和流dataframe进行join操作来完成。这里的静态dataframe是指常规的dataframe,也就是从某个固定位置一次性读取的数据集,比如:hive或hdfs。

允许流和静态dataframe进行操作,让spark structured streaming对流处理的功能变得更加灵活,可扩展性更强。

场景架构

这种场景的架构如下图所示:

[spark structured streaming]----->[transform]- —>[output] -----> [kafka]
​ |/
[mysql/hive/hdfs]

完整例子
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
from __future__ import print_function

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import window


def func1(df):
    #
    wc = df.groupBy(window(df.timestamp, "10 seconds", "5 seconds"), df.word).count()
    wc.write.mode("append").format("json").save("/tmp/sparktest/json/")


def func2(df):
    #
    wc2 = df.groupBy('word').count()
    wc2.write.mode("append").format("csv").save("/tmp/sparktest/csv/")


def func3(df):
    # 通过读取外部数据,创建静态的dataframe。
    # 用来与流数据进行交互。
    list_data = [("this", "1"), ("that", "2")]
    schema = ["word", "col2"]
    df1 = spark.createDataframe(list_data, schema)

    # 计算流数据得到一个dataframe
    wc = df.groupBy(window(df.timestamp, "10 seconds", "5 seconds"), df.word).count()

    # 使用静态df和流df进行join操作
    res_df = wc.join(df1, "word", "left_outer")
    # sink
    res_df.write.mode("append").format("json").save("/tmp/sparktest/joinres/")


func_list = [func1, func2, func3]

# 主循环
def foreach_batch_function(df, epoch_id):
    df.persist()

    # Transform and write batchDF
    for f in func_list:
        f(df)
    
    df.unpersist()

    
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: structured_network_wordcount.py  ", file=sys.stderr)
        sys.exit(-1)

    host = sys.argv[1]
    port = int(sys.argv[2])

    spark = SparkSession
        .builder
        .appName("StructuredNetworkWordCount")
        .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    lines = spark
        .readStream
        .format('socket')
        .option('host', host)
        .option('port', port)
        .option('includeTimestamp', 'true')
        .load()

    # Split the lines into words
    words = lines.select(
        explode(split(lines.value, ' ')).alias('word'),
        lines.timestamp
    )

    # multiple action
    query = words.writeStream
                      .foreachBatch(foreach_batch_function).start()

    query.awaitTermination()

小结

通过与静态 的dataframe进行join,让spark structured streaming的可扩展性大大增加。让其适用场景也大大增多。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/581524.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号