JSON源不是非常适合具有不断发展的模式的数据(而不是Avro或Parquet),但是简单的解决方案是对所有源使用相同的模式,并使新字段为可选/可为空:
import org.apache.spark.sql.types.{StructType, StructField, LongType}val schema = StructType(Seq( StructField("A", StructType(Seq( StructField("B", LongType, true), StructField("D", LongType, true) )), true), StructField("C", LongType, true)))您可以
schema像这样传递给
DataframeReader:
val rddV1 = sc.parallelize(Seq("{ "A": {"B": 1 } }"))val df1 = sqlContext.read.schema(schema).json(rddV1)val rddV2 = sc.parallelize(Seq("{ "A": {"B": 1 }, "C": 2 }"))val df2 = sqlContext.read.schema(schema).json(rddV2)val rddV3 = sc.parallelize(Seq("{ "A": {"B": 1, "D": 3 }, "C": 2 }"))val df3 = sqlContext.read.schema(schema).json(rddV3)并且您将获得一个独立于变体的一致结构:
require(df1.schema == df2.schema && df2.schema == df3.schema)
缺少的列会自动设置为
null:
df1.printSchema// root// |-- A: struct (nullable = true)// | |-- B: long (nullable = true)// | |-- D: long (nullable = true)// |-- C: long (nullable = true)df1.show// +--------+----+// | A| C|// +--------+----+// |[1,null]|null|// +--------+----+df2.show// +--------+---+// | A| C|// +--------+---+// |[1,null]| 2|// +--------+---+df3.show// +-----+---+// | A| C|// +-----+---+// |[1,3]| 2|// +-----+---+
注意事项 :
此解决方案取决于数据源。它可能与其他来源一起使用或可能不一起使用,甚至导致记录格式错误。
zero323已回答了问题,但在Scala中。这是同一件事,但是在Java中。
public void evolvingSchema() { String versionOne = "{ "A": {"B": 1 } }"; String versionTwo = "{ "A": {"B": 1 }, "C": 2 }"; String versionThree = "{ "A": {"B": 1, "D": 3 }, "C": 2 }"; process(spark.getContext(), "1", versionOne); process(spark.getContext(), "2", versionTwo); process(spark.getContext(), "2", versionThree);}private static void process(JavaSparkContext sc, String version, String data) { StructType schema = DataTypes.createStructType(Arrays.asList( DataTypes.createStructField("A", DataTypes.createStructType(Arrays.asList( DataTypes.createStructField("B", DataTypes.LongType, true), DataTypes.createStructField("D", DataTypes.LongType, true))), true), DataTypes.createStructField("C", DataTypes.LongType, true))); SQLContext sqlContext = new SQLContext(sc); Dataframe df = sqlContext.read().schema(schema).json(sc.parallelize(Arrays.asList(data))); try { df.select("C").collect(); } catch(Exception e) { System.out.println("Failed to C for " + version); } try { df.select("A.D").collect(); } catch(Exception e) { System.out.println("Failed to A.D for " + version); }}


