栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink将CSV/JSON文件导入Iceberg

Flink将CSV/JSON文件导入Iceberg

一.建表

参考:https://iceberg.apache.org/docs/latest/flink/#writing-with-datastream
以下Sql都是在Flink的sql-client客户端进行操作

1. 创建Catalog
-- 我这里使用的是Hadoop Catalog
create catalog iceberg_catalog with(
 'type'='iceberg',
'catalog-type'='hadoop',
'clients'='2',
'warehouse'='hdfs://hadoop01:9000/warehouse/iceberg'
);
2. 创建表
create table if not exists iceberg_catalog.ods_db.test(
 etl_date varchar(200),
 `id` Int,
`name` VARCHAR,
primary key (`id`) not enforced
)
partitioned by (etl_date)
with (
'write.format.default'='ORC',
'format-version'='2',
'write.update.mode'='copy-on-write',
'write.delete.mode'='copy-on-write',
'write.update.isolation-level'='serializable',
'write.delete.isolation-level'='serializable'
);
二. 代码 1.依赖

      org.apache.flink
      flink-java
      1.12.0
      
    
    
      org.apache.flink
      flink-streaming-java_2.12
      1.12.0
      
    
    
      org.apache.flink
      flink-clients_2.12
      1.12.0
      
    
    
      org.apache.iceberg
      iceberg-core
      0.13.1
    

    
      org.apache.iceberg
      iceberg-flink-runtime
      0.12.1
    
2. 数据源
1,zhangsan
2,lisi
3,wangwu
4,zhaoliu
3. 代码
public class WriteIceberg {
    public static void main(String[] args) throws Exception {
        //创建Flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Source
        DataStreamSource inputStream = env.readTextFile("/Users/xxx/Code/IDEA/flink_study/input/demo.csv");
        Configuration hadoopConf = new Configuration();
        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://10.67.222.16:9000/warehouse/iceberg/ods_db/test", hadoopConf);
        DataStream dataStream = inputStream.map(line -> {
            //切分数据源(不管是JSON数据还是CSV数据在这里进行逻辑处理即可)
            String[] fields = line.split(",");
            //需要将数据转化为rowdata类型
            GenericRowData rowData = new GenericRowData(3);
            //分区字段(etl_date)
            rowData.setField(0,"20220320");
            //id
            rowData.setField(1, new Integer(fields[0]));
            //name(String类型需要转换为StringData类型)
            rowData.setField(2, StringData.fromString(fields[1]));
            System.out.println(rowData);
            return rowData;
        });

        //Sink
        FlinkSink.forRowData(dataStream)
                .tableLoader(tableLoader)
                .build();

        env.execute();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773781.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号