栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink订阅及消费Hologres Binlog

Flink订阅及消费Hologres Binlog

订阅Hologres Binlog 要求

Hologres V0.9及以上版本。 开启Binlog

Hologres默认关闭Binlog,可以通过binlog.level和binlog.ttl开启该功能。

列存表开启Binlog的成本大于行存表。

使用限制

V0.10以下版本,已存在的表无法通过修改表属性的方式开启Binlog,如需开启必须重建表。不支持消费分区表的Binlog。对于更新频繁的场景,建议使用行存表开启Binlog。 使用示例

begin;
create table test_message_src(
  id int primary key, 
  title text not null, 
  body text);
call set_table_property('test_message_src', 'orientation', 'row');--创建行存表test_message_src
call set_table_property('test_message_src', 'clustering_key', 'id');--在id列建立聚簇索引
call set_table_property('test_message_src', 'binlog.level', 'replica');--设置表属性开启Binlog功能
call set_table_property('test_message_src', 'binlog.ttl', '86400');--binlog.ttl,Binlog的TTL,单位为秒
commit;

参数说明:

binlog.level:是否开启Binlog,replica(开启) 、none(关闭)。binlog.ttl:Binlog的TTL,单位秒。默认30天,即2592000。 按需开启Binlog

HologresV1.1之后版本支持。

开启binlog

-- 设置表属性开启Binlog功能
begin;
call set_table_property('test_message_src', 'binlog.level', 'replica');
commit;

-- 设置表属性,配置Binlog TTL时间,单位秒
begin;
call set_table_property('test_message_src', 'binlog.ttl', '2592000');
commit;

关闭binlog

-- 设置表属性关闭Binlog功能
begin; 
call set_table_property('bin_demo', 'binlog.level', 'none'); 
commit; 

修改binlog的TTL

begin; 
call set_table_property('bin_demo', 'binlog.ttl', '8640'); --单位秒 
commit;
Binlog格式说明

Binlog字段由Binlog系统字段和用户Table字段组成。如下所示:

字段名称字段类型说明
hg_binlog_lsnBIGINTBinlog的系统字段,表示Binlog序号。Shard内部单调递增不保证连续,不同Shard之间不保证唯一和有序。
hg_binlog_event_typeBIGINTBinlog的系统字段,表示当前Record所表示的修改类型。
hg_binlog_timestamp_usBIGINTBinlog的系统字段,系统时间戳,单位为us。
user_table_column_1用户自定义用户Table字段。
user_table_column_n用户自定义用户Table字段。

注意事项:

hg_binlog_event_type有四种取值:

DELETe=2,表示当前Binlog为删除记录。INSERT=5,表示当前Binlog为插入新记录。BEFORE_UPDATE=3,表示当前Binlog为一条更新前的记录。AFTER_UPDATE=7,表示当前Binlog为一条更新后的记录。 UPDATE操作会产生两条Binlog记录,分别为更新前和更新后的记录。订阅Binlog功能会保证这两条记录是连续的且更新前的Binlog记录在前,更新后的Binlog记录在后。用户字段顺序与DDL定义顺序一致。 Flink和Blink实时消费Hologres Binlog Flink实时消费Binlog

Flink VVP-2.4及以上版本,支持Hologres Connector实时消费Binlog,如下:

使用DDL语句创建源表 源表 DDL(非CDC模式)

Source消费的Binlog数据作为普通的Flink数据传递给下游节点,所有数据都是作为Insert类型的数据,可以根据业务情况选择如何处理特定hg_binlog_event_type类型的数据。

create table test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='',
  'tablename'='',
  'username'='',
  'password'='',
  'endpoint'='',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);
源表 DDL(CDC模式)

Source消费的Binlog数据,将根据hg_binlog_event_type自动为每行数据设置准确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能完成表的数据的镜像同步,类似MySQL和Postgres的CDC功能。

create table test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='',
  'tablename'='',
  'username'='',
  'password'='',
  'endpoint'='',
  'binlog' = 'true',
  'cdcMode' = 'true'
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);
参数说明

hg_binlog_xxx开头的三个字段表示Binlog的系统字段,命名和类型不支持修改。其余字段需要和用户字段一一对应,且必须为小写。

参数名称是否必填说明
connector源表类型,值填写为hologres。
dbname读取的Hologres DB名称。
tablename读取的表名称。
username当前阿里云账号的AccessKey ID。
password当前阿里云账号的AccessKey Secret。
endpointHologres对应VPC的区域。
binlog是否为Binlog source。如果需要消费,需要将binlog参数设置为true。
cdcmode读取Binlog时是否采用CDC模式。如果是CDC模式,需要将cdcmode参数设置为true。
binlogMaxRetryTimes读取Binlog出错重试次数,默认为60次。
binlogRetryIntervalMs读取Binlog出错重试间隔,默认为2000ms。
binlogBatchReadSize读取Binlog批量大小,默认为16个。
startTime启动位点的时间。如果没有设置该参数,且作业没有从状态恢复,则从最早的Binlog开始消费Hologres数据。格式为yyyy-MM-dd hh:mm:ss。
配置Binlog并发

Binlog订阅的并发等于Hologres中Table的Shard个数,请执行如下语句查看Shard数。Binlog并发建议执行计划配置,将其并发数与Binlog对应的Hologres中Table的Shard数保持一致。

select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = 'Hologres表名';
Blink实时消费Binlog

Blink 3.7及以上版本,支持Hologres Connector实时消费Binlog。

使用DDL语句创建源表
create table test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  type = 'hologres',
  'endpoint' = 'ip:port',--Hologres的vpc网络地址
  'username' = 'xxxx',--当前账号的AccessKey ID
  'password' = 'xxxx',--当前账号的AccessKey Secret
  'dbname' = 'xxxx',--Hologres的DB名
  'tablename' = 'xxxx',--Hologres的表名
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '256'
);

参数说明及Binlog并发配置同Flink。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775170.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号