栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark structured stream的Update模式

spark structured stream的Update模式

spark的update模式的定义为:自动上一次trigger以来有变化的key都会输出到kafka sink中。

下面的例子完整实现一个从kafka消费并聚合消息,然后把聚合消息写入到目标kafka的完整过程:

from pyspark import SparkConf
from pyspark.sql import SparkSession
import traceback
# import builtins as py_builtin
from pyspark.sql.functions import max
from pyspark.sql.functions import desc
from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark.sql.types import *
from pyspark.sql.functions import col, column, expr
from pyspark.sql.functions import *
from pyspark.sql import Row

appname = "test"  # 任务名称
master = "local[*]"  # 单机模式设置
'''
local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式。
local[K]: 指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用cpu的计算能力
local[*]: 这种模式直接帮你按照cpu最多cores来设置线程数了。
'''
# spark_driver_host = "10.0.0.248"

try:
    conf = SparkConf().setAppName(appname). 
        set('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0') 
        .set("spark.jars.repositories", 'http://maven.aliyun.com/nexus/content/groups/public/') 
        .setMaster(master)  # 本地
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    df = spark 
        .readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers", "localhost:9092") 
        .option("startingOffsets", "latest") 
        .option("subscribe", "mykafkatest") 
        .load()
    words = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "offset", "timestamp")
    schema = StructType() 
        .add("topic", StringType()) 
        .add("age", StringType()) 
 
    # 通过from_json,定义schema来解析json
    # res = words.where(instr("value", '{') ==0).select(from_json("value", schema).alias("data")).select("data.*")
    streamSource = words.where(instr("value", 'topic') > 0).select(from_json("value", schema).alias("data"),
                                                                   col("timestamp")) 
        .select("data.*", 'timestamp')
    streamSource = streamSource.select(col('age').cast('int').alias('age'), col('topic'), col('timestamp'))

    windowedCounts = streamSource.withWatermark("timestamp", "10 seconds") 
        .groupBy(window(col("timestamp"), '10 seconds', '10 seconds'), col("topic")).count()

    # query = windowedCounts 
    #     .writeStream 
    #     .outputMode('complete') 
    #     .format('console') 
    #     .option('truncate', 'false') 
    #     .start()
    res = windowedCounts.withColumn('constfield', lit('1'))
    query = res.select(
        to_json(struct("topic", "window")).cast('string').alias("key"),
        to_json(struct("topic", "window", "count")).cast('string').alias("value"))
        .writeStream 
        .outputMode("update") 
        .format("kafka") 
        .option("kafka.bootstrap.servers", "localhost:9092") 
        .option("topic", "mykafkatestdest") 
        .option("checkpointLocation", '''D:sparksparkspark-2.3.0-bin-hadoop2.7checkpointkafkatest3''') 
        .trigger(processingTime='3 seconds') 
        .start()
    query.awaitTermination()

    print('计算成功!')
except:
    traceback.print_exc()  # 返回出错信息
    print('连接出错!')

运行结果:

总结:
update输出模式的情况下输出的记录会有重复,每个trigger内有变的记录都会输出到kafka 目标sink中

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761157.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号