参考:https://iceberg.apache.org/docs/latest/flink/#writing-with-datastream
以下Sql都是在Flink的sql-client客户端进行操作
-- 我这里使用的是Hadoop Catalog create catalog iceberg_catalog with( 'type'='iceberg', 'catalog-type'='hadoop', 'clients'='2', 'warehouse'='hdfs://hadoop01:9000/warehouse/iceberg' );2. 创建表
create table if not exists iceberg_catalog.ods_db.test( etl_date varchar(200), `id` Int, `name` VARCHAR, primary key (`id`) not enforced ) partitioned by (etl_date) with ( 'write.format.default'='ORC', 'format-version'='2', 'write.update.mode'='copy-on-write', 'write.delete.mode'='copy-on-write', 'write.update.isolation-level'='serializable', 'write.delete.isolation-level'='serializable' );二. 代码 1.依赖
2. 数据源org.apache.flink flink-java 1.12.0 org.apache.flink flink-streaming-java_2.12 1.12.0 org.apache.flink flink-clients_2.12 1.12.0 org.apache.iceberg iceberg-core 0.13.1 org.apache.iceberg iceberg-flink-runtime 0.12.1
1,zhangsan 2,lisi 3,wangwu 4,zhaoliu3. 代码
public class WriteIceberg {
public static void main(String[] args) throws Exception {
//创建Flink执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Source
DataStreamSource inputStream = env.readTextFile("/Users/xxx/Code/IDEA/flink_study/input/demo.csv");
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://10.67.222.16:9000/warehouse/iceberg/ods_db/test", hadoopConf);
DataStream dataStream = inputStream.map(line -> {
//切分数据源(不管是JSON数据还是CSV数据在这里进行逻辑处理即可)
String[] fields = line.split(",");
//需要将数据转化为rowdata类型
GenericRowData rowData = new GenericRowData(3);
//分区字段(etl_date)
rowData.setField(0,"20220320");
//id
rowData.setField(1, new Integer(fields[0]));
//name(String类型需要转换为StringData类型)
rowData.setField(2, StringData.fromString(fields[1]));
System.out.println(rowData);
return rowData;
});
//Sink
FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.build();
env.execute();
}
}



