栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

flink1.11.1集成iceberg0.11(hadoop

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flink1.11.1集成iceberg0.11(hadoop

上一篇文章讲了flink集成iceberg需要的包,这篇讲下flink集成iceberg一些简单的demo测试

1. 进入 sql-client.sh 命令行

# -j 指定 iceberg flink的集成包
# -d 指定配置文件

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`


./bin/sql-client.sh embedded -j /opt/iceberg0.11/iceberg-flink-runtime-0.11.0.jar shell

如果 iceberg包已经放在 flink lib目录下 可以执行
./bin/sql-client.sh embedded 进入命令行

2. 创建存放链接kafka源的库和表

创建kafka 的库
create database default_catalog.kafka_source;

创建kafka的表
CREATE TABLE default_catalog.kafka_source.kafka_source_zhengtx(
 id BIGINT,
 data BIGINT
) WITH (
 'connector' = 'kafka',
 'topic' = 'kafka_source_zhengtx',
 'properties.bootstrap.servers' = '172.18.34.203:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'csv.field-delimiter' = ',',
 'scan.startup.mode' = 'earliest-offset',
 'csv.ignore-parse-errors' = 'true'
);

3. 用java代码向kafka的topic里面写入数据

public class KafkaProduceA {
    public static void main(String[] args) throws IOException, InterruptedException {
        Properties props = new Properties();
//        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MyContent.BOOTSTRAP_SERVERS_CONFIG);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<>(props);
        int count = 0;
        while(true){
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            String input = br.readLine();
//            String input = count + "," + count;
            System.out.println("system.in:" + input);
            ProducerRecord pushRecord = new ProducerRecord(MyContent.TOPIC_A_CANAL,input);
            producer.send(pushRecord);
            Thread.sleep(1000);
            count ++;
        }
    }
}

4. 在java端 输入数据,发送到kafka

1,1
2,2
3,3

5. 在flink sql-client中查询kafka表数据

select * from default_catalog.kafka_source.kafka_source_zhengtx;


                        id                         data
                         1                         1
                         2                         2
                         3                         3

说明用flink消费kafka数据成功了,下面测试kafka数据写入iceberg_t1表

6. 创建iceberg 的库和表

创建iceberg的库(iceberg的catalog在sql-client-default.yaml里面配置)
use catalog hadoop_catalog;
create database iceberg;

创建iceberg的表
CREATE TABLE hadoop_catalog.iceberg.iceberg_t1 (
    id BIGINT COMMENT 'unique id',
    data BIGINT
) with ('write.distribution-mode'='hash');  

启动flink任务,读取kafka数据插入 iceberg_t1 

SET execution.type = streaming ;
SET table.dynamic-table-options.enabled=true;
INSERT INTO hadoop_catalog.iceberg.iceberg_t1 select id,data from default_catalog.kafka_source.kafka_source_zhengtx ;

查看iceberg_t1表数据(固定窗口,不会随时间更新)
select * from hadoop_catalog.iceberg.iceberg_t1;

创建跟新窗口查看iceberg_t1表数据(1s 钟更新一次)
select * from hadoop_catalog.iceberg.iceberg_t1  ;


在java端输入数据,写入kafka源,可以看到iceberg_t1表数据也更新了

到这里,从kafka增量读取数据,写入iceberg的iceberg_t1表就成功了

7. 创建iceberg_t2,从iceberg_t1同步数据到iceberg_t2

创建iceberg_t2
CREATE TABLE hadoop_catalog.iceberg.iceberg_t2 (
    id BIGINT COMMENT 'unique id',
    data BIGINT
) with ('write.distribution-mode'='hash'); 

启动flink任务,从iceberg_t1读取数据写入iceberg_t2
use catalog hadoop_catalog;
use iceberg;

insert into iceberg_t2 select id,data from iceberg_t1  ;

执行flinksql任务,查看iceberg_t2表数据
select * from hadoop_catalog.iceberg.iceberg_t2  ;
可以看到iceberg_t2表数据也更新了

在java端输入数据,写入kafka源,持续观察iceberg_t2表数据,发现新数据也写入了。

到这里,

  1. 创建kafka表,flink增量读取kafka数据
  2. flink增量读取的kafka数据,增量写入iceberg表 iceberg_t1
  3. flink增量读取iceberg表iceberg_t1表数据,写入icebergb表 iceberg_t2

整个流程就跑通了

参数备注
monitor-interval: 数据监控频率
streaming: 是否是流式任务
table.dynamic-table-options.enabled 开启option参数配置

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/531643.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号