栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Iceberg 合并小文件并删除历史(Flink)

Iceberg 合并小文件并删除历史(Flink)

Iceberg 合并小文件并删除历史(Flink)

Iceberg每一次操作都会产生多个数据文件(metadata、data、snapshot),需要自行合并清理。
详细Iceberg写入时文件变化请参考官网https://iceberg.apache.org/

  • 建表时新增with配置
    # iceberg使用代码合并小文件后如果不开启此配置,历史文件同样不会删除,开启后就会实现合并后清除历史文件(历史文件保留最大值为5,metadata和data里面文件数则始终保持为6个)
        # 启用提交后写入元数据删除
        write.metadata.delete-after-commit.enabled=true
        # 配置保留历史数量(比如配置为5,则元数据和数据都保留5份历史数据和1份最新数据)
        write.metadata.previous-versions-max=5
    
  • 建表示例:
    create table test.test_iceberg(
        id bigint ,
        name string,
        age bigint,
        PRIMARY KEY (id) NOT ENFORCED
    ) 
    STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
    TBLPROPERTIES (
     'write.distribution-mode'='hash',
     'write.metadata.delete-after-commit.enabled'='true',
     'write.metadata.previous-versions-max'='5'
    );
    

更多详细配置:https://iceberg.apache.org/getting-started/

  • 代码如下:
    object Main {
        def main(args: Array[String]): Unit = {
            val env = StreamExecutionEnvironment.getExecutionEnvironment
            val fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
            val tableEnv = StreamTableEnvironment.create(env, fsSettings)
    
            // 获取配置文件
            val tool = ParameterTool
                .fromPropertiesFile(
                    Main.getClass.getClassLoader
                        .getResourceAsStream("application.properties")
                )
                .mergeWith(ParameterTool.fromSystemProperties())
    
            tableEnv.executeSql(
                """
                  |CREATE CATALOG iceberg_catalog WITH (
                  |  'type'='iceberg',
                  |  'catalog-type'='hive',
                  |  'uri'='thrift://*.*.*.*:7004',
                  |  'clients'='5',
                  |  'property-version'='1',
                  |  'warehouse'='hdfs:///usr/hive/warehouse/iceberg'
                  |  )
                  |""".stripMargin)
    
            // 加载catalog
            val loader = CatalogLoader.hive("iceberg_catalog", new Configuration(), new java.util.HashMap())
            val catalog = loader.loadCatalog
            val str = tool.get("dw.table.names")
            val names = str.split(",")
    
            // 遍历所有表名
            if (names.nonEmpty) {
                names.foreach(name => {
                    println(s"正在清理 $name 表!!!")
                    val identifier = TableIdentifier.of(Namespace.of("dw"), name)
                    // 加载表
                    val table = catalog.loadTable(identifier)
                
                    // 合并小文件(metadata)
                    Actions.forTable(table)
                        .rewriteDataFiles
                        .maxParallelism(1)
                        .targetSizeInBytes(128 * 1024 * 1024)
                        .execute
    
                    // 清除5分钟前历史快照
                    val snapshot = table.currentSnapshot
                    val old = snapshot.timestampMillis - TimeUnit.MINUTES.toMillis(5)
                     
                    if (snapshot != null) table.expireSnapshots.expireOlderThan(old).commit
                    println(s" $name 表 清理完成!!!")
                })
            }
        }
    }
    
  • application.properties格式
    dw.table.names=表名1,表名2
    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327014.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号