Hologres V0.9及以上版本。 开启Binlog
Hologres默认关闭Binlog,可以通过binlog.level和binlog.ttl开启该功能。
列存表开启Binlog的成本大于行存表。
使用限制V0.10以下版本,已存在的表无法通过修改表属性的方式开启Binlog,如需开启必须重建表。不支持消费分区表的Binlog。对于更新频繁的场景,建议使用行存表开启Binlog。 使用示例
begin;
create table test_message_src(
id int primary key,
title text not null,
body text);
call set_table_property('test_message_src', 'orientation', 'row');--创建行存表test_message_src
call set_table_property('test_message_src', 'clustering_key', 'id');--在id列建立聚簇索引
call set_table_property('test_message_src', 'binlog.level', 'replica');--设置表属性开启Binlog功能
call set_table_property('test_message_src', 'binlog.ttl', '86400');--binlog.ttl,Binlog的TTL,单位为秒
commit;
参数说明:
binlog.level:是否开启Binlog,replica(开启) 、none(关闭)。binlog.ttl:Binlog的TTL,单位秒。默认30天,即2592000。 按需开启Binlog
HologresV1.1之后版本支持。
开启binlog
-- 设置表属性开启Binlog功能
begin;
call set_table_property('test_message_src', 'binlog.level', 'replica');
commit;
-- 设置表属性,配置Binlog TTL时间,单位秒
begin;
call set_table_property('test_message_src', 'binlog.ttl', '2592000');
commit;
关闭binlog
-- 设置表属性关闭Binlog功能
begin;
call set_table_property('bin_demo', 'binlog.level', 'none');
commit;
修改binlog的TTL
begin;
call set_table_property('bin_demo', 'binlog.ttl', '8640'); --单位秒
commit;
Binlog格式说明
Binlog字段由Binlog系统字段和用户Table字段组成。如下所示:
| 字段名称 | 字段类型 | 说明 |
|---|---|---|
| hg_binlog_lsn | BIGINT | Binlog的系统字段,表示Binlog序号。Shard内部单调递增不保证连续,不同Shard之间不保证唯一和有序。 |
| hg_binlog_event_type | BIGINT | Binlog的系统字段,表示当前Record所表示的修改类型。 |
| hg_binlog_timestamp_us | BIGINT | Binlog的系统字段,系统时间戳,单位为us。 |
| user_table_column_1 | 用户自定义 | 用户Table字段。 |
| … | … | … |
| user_table_column_n | 用户自定义 | 用户Table字段。 |
注意事项:
hg_binlog_event_type有四种取值:
DELETe=2,表示当前Binlog为删除记录。INSERT=5,表示当前Binlog为插入新记录。BEFORE_UPDATE=3,表示当前Binlog为一条更新前的记录。AFTER_UPDATE=7,表示当前Binlog为一条更新后的记录。 UPDATE操作会产生两条Binlog记录,分别为更新前和更新后的记录。订阅Binlog功能会保证这两条记录是连续的且更新前的Binlog记录在前,更新后的Binlog记录在后。用户字段顺序与DDL定义顺序一致。 Flink和Blink实时消费Hologres Binlog Flink实时消费Binlog
Flink VVP-2.4及以上版本,支持Hologres Connector实时消费Binlog,如下:
使用DDL语句创建源表 源表 DDL(非CDC模式)Source消费的Binlog数据作为普通的Flink数据传递给下游节点,所有数据都是作为Insert类型的数据,可以根据业务情况选择如何处理特定hg_binlog_event_type类型的数据。
create table test_message_src_binlog_table( hg_binlog_lsn BIGINT, hg_binlog_event_type BIGINT, hg_binlog_timestamp_us BIGINT, id INTEGER, title VARCHAR, body VARCHAR ) with ( 'connector'='hologres', 'dbname'='源表 DDL(CDC模式)', 'tablename'=' ', 'username'=' ', 'password'=' ', 'endpoint'=' ', 'binlog' = 'true', 'binlogMaxRetryTimes' = '10', 'binlogRetryIntervalMs' = '500', 'binlogBatchReadSize' = '100' );
Source消费的Binlog数据,将根据hg_binlog_event_type自动为每行数据设置准确的Flink RowKind类型(INTERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),这样就能完成表的数据的镜像同步,类似MySQL和Postgres的CDC功能。
create table test_message_src_binlog_table( id INTEGER, title VARCHAR, body VARCHAR ) with ( 'connector'='hologres', 'dbname'='参数说明', 'tablename'=' ', 'username'=' ', 'password'=' ', 'endpoint'=' ', 'binlog' = 'true', 'cdcMode' = 'true' 'binlogMaxRetryTimes' = '10', 'binlogRetryIntervalMs' = '500', 'binlogBatchReadSize' = '100' );
hg_binlog_xxx开头的三个字段表示Binlog的系统字段,命名和类型不支持修改。其余字段需要和用户字段一一对应,且必须为小写。
| 参数名称 | 是否必填 | 说明 |
|---|---|---|
| connector | 是 | 源表类型,值填写为hologres。 |
| dbname | 是 | 读取的Hologres DB名称。 |
| tablename | 是 | 读取的表名称。 |
| username | 是 | 当前阿里云账号的AccessKey ID。 |
| password | 是 | 当前阿里云账号的AccessKey Secret。 |
| endpoint | 是 | Hologres对应VPC的区域。 |
| binlog | 是 | 是否为Binlog source。如果需要消费,需要将binlog参数设置为true。 |
| cdcmode | 否 | 读取Binlog时是否采用CDC模式。如果是CDC模式,需要将cdcmode参数设置为true。 |
| binlogMaxRetryTimes | 否 | 读取Binlog出错重试次数,默认为60次。 |
| binlogRetryIntervalMs | 否 | 读取Binlog出错重试间隔,默认为2000ms。 |
| binlogBatchReadSize | 否 | 读取Binlog批量大小,默认为16个。 |
| startTime | 否 | 启动位点的时间。如果没有设置该参数,且作业没有从状态恢复,则从最早的Binlog开始消费Hologres数据。格式为yyyy-MM-dd hh:mm:ss。 |
Binlog订阅的并发等于Hologres中Table的Shard个数,请执行如下语句查看Shard数。Binlog并发建议执行计划配置,将其并发数与Binlog对应的Hologres中Table的Shard数保持一致。
select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = 'Hologres表名';Blink实时消费Binlog
Blink 3.7及以上版本,支持Hologres Connector实时消费Binlog。
使用DDL语句创建源表create table test_message_src_binlog_table( hg_binlog_lsn BIGINT, hg_binlog_event_type BIGINT, hg_binlog_timestamp_us BIGINT, id INTEGER, title VARCHAR, body VARCHAR ) with ( type = 'hologres', 'endpoint' = 'ip:port',--Hologres的vpc网络地址 'username' = 'xxxx',--当前账号的AccessKey ID 'password' = 'xxxx',--当前账号的AccessKey Secret 'dbname' = 'xxxx',--Hologres的DB名 'tablename' = 'xxxx',--Hologres的表名 'binlog' = 'true', 'binlogMaxRetryTimes' = '10', 'binlogRetryIntervalMs' = '500', 'binlogBatchReadSize' = '256' );
参数说明及Binlog并发配置同Flink。



