Iceberg每一次操作都会产生多个数据文件(metadata、data、snapshot),需要自行合并清理。
详细Iceberg写入时文件变化请参考官网https://iceberg.apache.org/
- 建表时新增with配置
# iceberg使用代码合并小文件后如果不开启此配置,历史文件同样不会删除,开启后就会实现合并后清除历史文件(历史文件保留最大值为5,metadata和data里面文件数则始终保持为6个) # 启用提交后写入元数据删除 write.metadata.delete-after-commit.enabled=true # 配置保留历史数量(比如配置为5,则元数据和数据都保留5份历史数据和1份最新数据) write.metadata.previous-versions-max=5 - 建表示例:
create table test.test_iceberg( id bigint , name string, age bigint, PRIMARY KEY (id) NOT ENFORCED ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' TBLPROPERTIES ( 'write.distribution-mode'='hash', 'write.metadata.delete-after-commit.enabled'='true', 'write.metadata.previous-versions-max'='5' );
更多详细配置:https://iceberg.apache.org/getting-started/
- 代码如下:
object Main { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tableEnv = StreamTableEnvironment.create(env, fsSettings) // 获取配置文件 val tool = ParameterTool .fromPropertiesFile( Main.getClass.getClassLoader .getResourceAsStream("application.properties") ) .mergeWith(ParameterTool.fromSystemProperties()) tableEnv.executeSql( """ |CREATE CATALOG iceberg_catalog WITH ( | 'type'='iceberg', | 'catalog-type'='hive', | 'uri'='thrift://*.*.*.*:7004', | 'clients'='5', | 'property-version'='1', | 'warehouse'='hdfs:///usr/hive/warehouse/iceberg' | ) |""".stripMargin) // 加载catalog val loader = CatalogLoader.hive("iceberg_catalog", new Configuration(), new java.util.HashMap()) val catalog = loader.loadCatalog val str = tool.get("dw.table.names") val names = str.split(",") // 遍历所有表名 if (names.nonEmpty) { names.foreach(name => { println(s"正在清理 $name 表!!!") val identifier = TableIdentifier.of(Namespace.of("dw"), name) // 加载表 val table = catalog.loadTable(identifier) // 合并小文件(metadata) Actions.forTable(table) .rewriteDataFiles .maxParallelism(1) .targetSizeInBytes(128 * 1024 * 1024) .execute // 清除5分钟前历史快照 val snapshot = table.currentSnapshot val old = snapshot.timestampMillis - TimeUnit.MINUTES.toMillis(5) if (snapshot != null) table.expireSnapshots.expireOlderThan(old).commit println(s" $name 表 清理完成!!!") }) } } } - application.properties格式
dw.table.names=表名1,表名2



