Flink1.11开始就不在提供flink-shaded-hadoop-2-uber的支持,所以如果需要flink支持hadoop得配置环境变量HADOOP_CLASSPATH
[root@hadoop1 flink-1.11.0]# vim bin/config.sh export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3 export HADOOP_HDFS_HOME=/opt/module/hadoop-3.1.3 export HADOOP_YARN_HOME=/opt/module/hadoop-3.1.3 export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3 export HADOOP_CLASSPATH=`hadoop classpath` export PATH=$PATH:$HADOOP_CLASSPATH
目前Iceberg只支持flink1.11.x的版本,所以我这使用flink1.11.0,将构建好的Iceberg的jar包复制到flink下
[root@hadoop1 libs]# cd /opt/module/iceberg-apache-iceberg-0.11.1/flink-runtime/build/libs/ [root@hadoop1 libs]# cp *.jar /opt/module/flink-1.11.0/lib/1.2、Flink SQL Client
1、启动flink集群,并启动flink sql client
bin/sql-client.sh embedded shell
2、使用 Catalogs 创建目录
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://mycluster/flink/warehouse/',
'property-version'='1'
);
或者修改 sql-client-defaults.yaml,添加以下内容
[root@hadoop103 conf]# vim sql-client-defaults.yaml catalogs: - name: hadoop_catalog type: iceberg catalog-type: hadoop warehouse: hdfs://mycluster/flink/warehouse/
3、使用当前 catalog
use catalog hadoop_catalog;
4、建库建表
建库可以直接使用create database;建表需要指定分区,使用flink对接iceberg不能使用iceberg的隐藏分区。
5、写入与修改数据
flink默认使用流的方式插入数据,这个时候流的插入是不支持overwrite操作的。需要将插入模式进行修改SET execution.type = batch;,改成批的插入方式,再次使用overwrite插入数据。如需要改回流式操作参数设置为 SET execution.type = streaming;会根据分区进行覆盖操作。
2、Flink API操作1、需要引入相关依赖包
2.1、读操作org.apache.iceberg iceberg-flink-runtime0.11.1 org.apache.hadoop hadoop-client${hadoop.version}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testA");
batchRead(env, tableLoader);
streamingRead(env, tableLoader);
env.execute();
}
// 通过batch的方式去读取数据
public static void batchRead(StreamExecutionEnvironment env, TableLoader tableLoader) {
DataStream batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
batch.map(item -> item.getInt(0) + "t" + item.getString(1) + "t" + item.getInt(2) + "t" + item.getString(3)).print();
}
// 通过streaming的方式去读取数据,启动之后程序不会立马停止
public static void streamingRead(StreamExecutionEnvironment env, TableLoader tableLoader)
{
DataStream stream = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(true).build();
stream.print();
}
2.2、 写操作
// 采用的是batch批处理
public static void appendingData(StreamExecutionEnvironment env, TableLoader tableLoader) {
DataStream batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
TableLoader tableB = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testB");
FlinkSink.forRowData(batch).tableLoader(tableB).build();
}
// 根据分区将数据进行覆盖操作
public static void overwriteData(StreamExecutionEnvironment env, TableLoader tableLoader) {
DataStream batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
TableLoader tableB = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testB");
FlinkSink.forRowData(batch).tableLoader(tableB).overwrite(true).build();
}
3、读写Flink存在的问题
- Flink 不支持 Iceberg 隐藏分区不支持通过计算列根据case class创建表不支持创建带水位线的表不支持添加列、删除列、重命名列Flink写iceberg需要使用checkpoint



