栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 其他

Flink 使用 SQL 读取 Kafka 写入到Doris表中

其他 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink 使用 SQL 读取 Kafka 写入到Doris表中

这次我们演示的是整合Flink Doris Connector 到 Flink 里使用,通过Flink Kafka Connector,通过SQL的方式流式实时消费Kafka数据,利用Doris Flink Connector将数据插入到Doris表中。

这里的演示我们是用过Flink SQL Client来进行的,

1. 准备条件

这里我们使用的环境是

  1. Doris-0.14.7
  2. doris-flink-1.0-SNAPSHOT.jar,这个可以自己去编译
  3. Flink-1.12.5
  4. flink-connector-kafka_2.11-1.12.1.jar
  5. kafka-clients-2.2.2.jar
  6. kafka-2.2.2

2. Kafka数据准备
  1. 首先我们在kafka下创建一个topic:
bin/kafka-topics.sh --create --topic user_behavior --replication-factor 1 --partitions 1 --zookeeper 10.220.147.155:2181,10.220.147.156:2181,10.220.147.157:2181 
  1. 向user_behavior topic队列中添加数据

    bin/kafka-console-producer.sh --broker-list 10.220.147.155:9092,10.220.147.156:9092,10.220.147.157:9092 --topic user_behavior
    

    示例数据如下:

    {"user_id": "54346222", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
    {"user_id": "662863337", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
    

    这里是演示,你可以将这个示例数据中的数据进行复制修改

3. doris 数据库建表

这里我们采用的是唯一主键模型

CREATE TABLE user_log (
    user_id VARCHAr(20),
    item_id VARCHAr(30),
    category_id VARCHAr(30),
    behavior VARCHAr(30),
    ts varchar(20)
) ENGINE=OLAP
UNIQUE KEY(`user_id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
   "replication_num" = "1",
   "in_memory" = "false",
    "storage_format" = "V2"
);

4.实战演示

5.总结

我们到这里整个的演示就结束了,使用Doris flink connector可以很容易的通过Flink SQL方式整合各种异构数据源,导入到Doris数仓中,非常的方便

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/279217.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号