栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

Pyspark 2.4.0,使用读取流从kafka读取avro-Python

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Pyspark 2.4.0,使用读取流从kafka读取avro-Python

您可以包括spark-avro软件包,例如使用

--packages
(调整版本以匹配spark安装):

bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.0

并提供您自己的包装器:

from pyspark.sql.column import Column, _to_java_columndef from_avro(col, jsonFormatSchema):     sc = SparkContext._active_spark_context     avro = sc._jvm.org.apache.spark.sql.avro    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro    return Column(f(_to_java_column(col), jsonFormatSchema))def to_avro(col):     sc = SparkContext._active_spark_context     avro = sc._jvm.org.apache.spark.sql.avro    f = getattr(getattr(avro, "package$"), "MODULE$").to_avro    return Column(f(_to_java_column(col)))

用法示例(从官方测试套件中采用):

from pyspark.sql.functions import col, structavro_type_struct = """{  "type": "record",  "name": "struct",  "fields": [    {"name": "col1", "type": "long"},    {"name": "col2", "type": "string"}  ]}"""df = spark.range(10).select(struct(    col("id"),    col("id").cast("string").alias("id2")).alias("struct"))avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))avro_struct_df.show(3)+----------+|      avro|+----------+|[00 02 30]||[02 02 31]||[04 02 32]|+----------+only showing top 3 rowsavro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)+------------------------------------------------+|from_avro(avro, struct<col1:bigint,col2:string>)|+------------------------------------------------+|         [0, 0]||         [1, 1]||         [2, 2]|+------------------------------------------------+only showing top 3 rows


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651516.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号