栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink-SQL实现kafka处理后写入kafka

Flink-SQL实现kafka处理后写入kafka

基本信息

kafka版本   1.13.2
topic 
      原始数据topic    user_behavior 
      输出数据topic    after_binlog

读取kafka原始数据

CREATE TABLE user_behavior (
    id BIGINT,
    name STRING,
    flag STRING
) WITH (
    'connector.type' = 'kafka',  -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
    'connector.topic' = 'binlog',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
    'connector.properties.zookeeper.connect' = 'vm61:2181',  -- zookeeper 地址
    'connector.properties.bootstrap.servers' = 'vm61:9092',  -- kafka broker 地址
    'format.type' = 'json'  -- 数据源格式为 json
);

创建聚合数据及输出kafka信息

CREATE TABLE after_user_behavior (
  name STRING,
  pv BIGINT,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'after_binlog',
  'properties.bootstrap.servers' = 'vm61:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

聚合查询语句

SELECt
  name,
  COUNT(*)
FROM user_behavior 
GROUP BY name;


输出数据

INSERT INTO after_user_behavior
SELECt
  name,
  COUNT(*)
FROM user_behavior 
GROUP BY name;

语句执行完成

原始数据内容为

输出数据内容为

所需jar包

任务信息

Upsert Kafka SQL 连接器
Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。

作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。

作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 ‘csv’、‘json’、‘avro’。请参考格式页面以获取更多详细信息和格式参数。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327248.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号