注意
仅适用于Blink 2.2.7及以上版本。
与数据总线DataHub、Kafka等数据源不同,全量MaxCompute源表通常作为有限流表使用。
DDL定义create table odps_source( id INT, user_name VARCHAR, content VARCHAR ) with ( type = 'odps', endPoint = 'http://service.cn.maxcompute.aliyun-inc.com/api', project = 'WITH参数', tableName = ' ', accessId = ' ', accessKey = ' ', `partition` = 'ds=2018****' --如果您的MaxCompute源表为非分区表,不声明该参数即可。 );
| 参数 | 说明 | 是否必填 | 备注 |
|---|---|---|---|
| endPoint | MaxCompute服务地址。 | 是 | 请参见Endpoint。 |
| tunnelEndpoint | MaxCompute Tunnel服务的连接地址。 | 否 | 请参见Endpoint。说明 VPC环境下为必填。 |
| project | MaxCompute项目名称。 | 是 | 无。 |
| tableName | MaxCompute表名。 | 是 | 无。 |
| accessId | AccessKey ID。 | 是 | 无。 |
| accessKey | AccessKey Secret。 | 是 | 无。 |
| partition | 分区名。 | 否 | 只存在一级分区的全量MaxCompute表例如,如果只存在1个分区列ds,则partition` = 'ds=20180905'`表示读`ds=20180905`分区的数据。存在多级分区的全量MaxCompute表例如,如果存在2个分区列`ds`和`hh`,则partition='ds=20180905,hh=*'表示读ds=20180905分区的数据。说明 分区过滤时需要声明所有分区的值。例如,上述示例中,只声明``partition= 'ds=20180905',则不会读取任何分区。 |
| subscribeNewPartition | 是否监听符合条件的新分区。 | 否 | 默认值为false,不监听新产生的分区。说明subscribeNewPartition参数值为true时,不可以指定partition的参数值 ,否则会造成无法读取新产生的分区的状况。该参数只在Blink 3.4.4版本临时存在,Blink 3.5.0版本废弃该参数。 |
| subscribeIntervalInSec | 监听新分区的间隔。 | 否 | 默认值为30,单位为秒。说明 监听间隔设置太小,会对全量MaxCompute metaData服务造成压力,有可能导致监听服务失败。 |
| maxPartitionCount | 未设置Partition参数时,读取当前分区表的分区个数。 | 否 | 默认值为100。说明 仅Blink 3.0及以上版本支持该参数。 |
| 全量MaxCompute字段类型 | 实时计算Flink版字段类型 |
|---|---|
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| BOOLEAN | BOOLEAN |
| DATETIME | TIMESTAMP |
| TIMESTAMP | TIMESTAMP |
| VARCHAR | VARCHAR |
| DECIMAL | DECIMAL |
| BINARY | VARBINARY |
| STRING | VARCHAR |
包含全量MaxCompute源表的实时计算Flink版作业代码示例如下。
CREATE TABLE odps_source ( cid varchar, rt DOUBLE, ) with ( type = 'odps', endPoint = '创建增量MaxCompute源表', project = ' ', tableName = ' ', accessId = ' ', accessKey = ' ', partition = 'ds=20190712' ); CREATE TABLE test ( cid varchar, invoke_count BIGINT ) with ( type='print' ); INSERT INTO test SELECt cid, count(*) as invoke_count FROM odps_source GROUP BY cid;
注意
仅适用于Blink 3.5.0 hotfix及以上版本。增量MaxCompute源表不支持作为维表使用。增量MaxCompute源表只支持MaxCompute分区表,不支持非分区表。 语法示例
create table odps_source(
id int,
user_name VARCHAR,
content VARCHAR
) with (
type = 'continuous-odps',
endPoint = 'your_end_point_name',
project = 'your_project_name',
tableName = 'your_table_name',
accessId = 'your_access_id',
accessKey = 'your_access_key',
startPartition = 'ds=20180905'
);
WITH参数
| 参数 | 说明 | 是否必填 | 备注 |
|---|---|---|---|
| type | connector类型 | 是 | 固定值为continuous-odps。 |
| endPoint | MaxCompute服务本身的连接地址 | 是 | 请参见Endpoint。 |
| tunnelEndpoint | MaxCompute Tunnel服务的连接地址 | 是:VPC环境必填否:其他非VPC环境 | 请参见Endpoint。 |
| project | 表所属的project名称 | 是 | 无 |
| tableName | 表名 | 是 | 无 |
| accessId | AccessKey ID | 是 | 无 |
| accessKey | AccessKey Secret | 是 | 无 |
| startPartition | 指定读取的起始分区。系统加载分区列表时,会把每个分区列表的所有分区和startPartition按照字母顺序进行比较,加载满足条件的分区的数据。此外,增量MaxCompute源表可以持续监听增量MaxCompute分区表。读完已有的分区后,任务不会退出,且持续监听并读入新分区。说明增量MaxCompute源表中必须存在一级分区,二级分区可选。如果指定二级分区,必须写在一级分区的后面。如果startPartition指定的分区不存在,系统会从下一个分区开始读取数据。 | 是 | 例如,指定startPartition是ds=20191201,表示加载增量MaxCompute表里所有满足ds >= 20191201的分区数据。如果一个增量MaxCompute分区表,有一级分区ds和二级分区type两个分区列,假设表里有以下5个分区:ds=20191201,type=ads=20191201,type=bds=20191202,type=ads=20191202,type=bds=20191202,type=c不同startPartition,满足分区的列表如下:ds=20191202ds=20191202,type=ads=20191202,type=bds=20191202,type=cds=20191201,type=cds=20191202,type=ads=20191202,type=bds=20191202,type=c |
| MaxCompute字段类型 | 实时计算Flink版字段类型 |
|---|---|
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| BOOLEAN | BOOLEAN |
| DATETIME | TIMESTAMP |
| TIMESTAMP | TIMESTAMP |
| DECIMAL | DECIMAL |
| BINARY | VARBINARY |
| STRING | VARCHAR |
注意
增量MaxCompute源表暂不支持CHAR、VARCHAR、ARRAY、MAP和STRUCT数据类型。您可以临时使用STRING替换VARCHAR。 代码示例
增量MaxCompute源表每天产生一个分区,分区列是ds,从ds=20191201
--读增量MaxCompute表,读取的分区范围是[ds=20191201,∞)。
CREATE TABLE odps_source (
cid VARCHAR,
rt DOUBLE,
) with (
type = 'continuous-odps',
endPoint = 'your_end_point_name',
project = 'your_project_name',
tableName = 'your_table_name',
accessId = 'xxxx',
accessKey = 'xxxx',
startPartition = 'ds=20191201'
);
CREATE TABLE test (
cid VARCHAR,
rt DOUBLE,
) with (
type='print'
);
INSERT INTO test
SELECt
cid, rt FROM odps_source;



