栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

使用Spark DataFrame扩展架构

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用Spark DataFrame扩展架构

JSON源不是非常适合具有不断发展的模式的数据(而不是Avro或Parquet),但是简单的解决方案是对所有源使用相同的模式,并使新字段为可选/可为空:

import org.apache.spark.sql.types.{StructType, StructField, LongType}val schema = StructType(Seq(  StructField("A", StructType(Seq(    StructField("B", LongType, true),     StructField("D", LongType, true)  )), true),  StructField("C", LongType, true)))

您可以

schema
像这样传递给
DataframeReader

val rddV1 = sc.parallelize(Seq("{ "A": {"B": 1 } }"))val df1 = sqlContext.read.schema(schema).json(rddV1)val rddV2 = sc.parallelize(Seq("{ "A": {"B": 1 }, "C": 2 }"))val df2 = sqlContext.read.schema(schema).json(rddV2)val rddV3 = sc.parallelize(Seq("{ "A": {"B": 1, "D": 3 }, "C": 2 }"))val df3 = sqlContext.read.schema(schema).json(rddV3)

并且您将获得一个独立于变体的一致结构:

require(df1.schema == df2.schema && df2.schema == df3.schema)

缺少的列会自动设置为

null

df1.printSchema// root//  |-- A: struct (nullable = true)//  |    |-- B: long (nullable = true)//  |    |-- D: long (nullable = true)//  |-- C: long (nullable = true)df1.show// +--------+----+// |       A|   C|// +--------+----+// |[1,null]|null|// +--------+----+df2.show// +--------+---+// |       A|  C|// +--------+---+// |[1,null]|  2|// +--------+---+df3.show// +-----+---+// |    A|  C|// +-----+---+// |[1,3]|  2|// +-----+---+

注意事项

此解决方案取决于数据源。它可能与其他来源一起使用或可能不一起使用,甚至导致记录格式错误。

zero323已回答了问题,但在Scala中。这是同一件事,但是在Java中。

public void evolvingSchema() {    String versionOne = "{ "A": {"B": 1 } }";    String versionTwo = "{ "A": {"B": 1 }, "C": 2 }";    String versionThree = "{ "A": {"B": 1, "D": 3 }, "C": 2 }";    process(spark.getContext(), "1", versionOne);    process(spark.getContext(), "2", versionTwo);    process(spark.getContext(), "2", versionThree);}private static void process(JavaSparkContext sc, String version, String data) {    StructType schema = DataTypes.createStructType(Arrays.asList( DataTypes.createStructField("A",         DataTypes.createStructType(Arrays.asList(      DataTypes.createStructField("B", DataTypes.LongType, true),         DataTypes.createStructField("D", DataTypes.LongType, true))), true), DataTypes.createStructField("C", DataTypes.LongType, true)));    SQLContext sqlContext = new SQLContext(sc);    Dataframe df = sqlContext.read().schema(schema).json(sc.parallelize(Arrays.asList(data)));    try {        df.select("C").collect();    } catch(Exception e) {        System.out.println("Failed to C for " + version);    }    try {        df.select("A.D").collect();    } catch(Exception e) {        System.out.println("Failed to A.D for " + version);    }}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/609474.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号