几个问题前置:需要对 iceberg schema 信息有基本了解
Time: 2022/02/15 第一版
能做什么?为什么能这么做?
表任意位置新增字段表任务位置删除字段分区字段变更
思考的问题
中间字段删除后,历史的数据对应的字段怎么读?中间字段新增后,历史的数据怎么读?分区变更后,历史和新增数据怎么读?等等 代码分析
iceberg 写 parquet 文件格式分析
如上图,Propeties 中含有本次写数据时 schema 的信息。原本表的字段为 id,name,age,dt,删除 name 字段后写入的数据文件,id 分别保留 iceberg 原本的 id,1,3,4,这时候如果我在 id 后面新增一个字段,iceberg.schema 就会变成 [{“id”:1,“name”:“id”,“required”:true,“type”:“int”},{“id”:5,“name”:“new_col”,“required”:true,“type”:“string”},{“id”:3,“name”:“age”,“required”:true,“type”:“int”},{“id”:4,“name”:“dt”,“required”:true,“type”:“string”}]
新增的列 id 从 5 开始
这里结合 Spark 的部分实现方式分析
RowDataReaderprivate CloseableIterableSparkParquetReaders.buildReadernewParquetIterable( InputFile location, FileScanTask task, Schema readSchema, Map idToConstant) { Parquet.ReadBuilder builder = Parquet.read(location) .reuseContainers() .split(task.start(), task.length()) .project(readSchema) // 这里创建 Parquet reader 进行 parquet 读取 .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant)) .filter(task.residual()) .caseSensitive(caseSensitive); if (nameMapping != null) { builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } return builder.build(); }
@SuppressWarnings("unchecked")
public static ParquetValueReader buildReader(Schema expectedSchema,
MessageType fileSchema,
Map idToConstant) {
// 判断是否含有 iceberg.id,也就是上面 parquet 文件中 iceberg.schema 中的字段 id
if (ParquetSchemaUtil.hasIds(fileSchema)) {
// 有的话,递归对每个 expected schema(我们想查询的字段)构建对应的 Parquet Reader
return (ParquetValueReader)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new ReadBuilder(fileSchema, idToConstant));
} else {
return (ParquetValueReader)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new FallbackReadBuilder(fileSchema, idToConstant));
}
}
ReadBuilder
Visitor 模式,直接看 ReaderBuilder 的实现,截取比较重要的 struct 方法的实现
例子:
expectSchema: 从上面代码来看,iceberg 在写入数据时,会带上此时表 schema 的 id,require 相关信息,读取的时候,会把当前表的 schema 和 数据文件中的 schema id 进行比对添加。 对于 rename,iceberg 不变更 id,更不影响查询对于任意位置新增删除字段,都是通过 id 映射,就算新增同名字段,对 iceberg 来说是新的 id。所以不影响其它字段的查询,也不会有数据偏移的问题反过来再思考前面遗留的问题等等@Override
public ParquetValueReader> struct(Types.StructType expected, GroupType struct,
List



