栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Blink SQL之创建MaxCompute源表

Blink SQL之创建MaxCompute源表

创建MaxCompute源表 创建全量MaxCompute源表

注意

仅适用于Blink 2.2.7及以上版本。

与数据总线DataHub、Kafka等数据源不同,全量MaxCompute源表通常作为有限流表使用。

DDL定义
create table odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) with (
  type = 'odps',
  endPoint = 'http://service.cn.maxcompute.aliyun-inc.com/api',
  project = '',
  tableName = '',
  accessId = '',
  accessKey = '',
  `partition` = 'ds=2018****' --如果您的MaxCompute源表为非分区表,不声明该参数即可。
);
WITH参数
参数说明是否必填备注
endPointMaxCompute服务地址。请参见Endpoint。
tunnelEndpointMaxCompute Tunnel服务的连接地址。请参见Endpoint。说明 VPC环境下为必填。
projectMaxCompute项目名称。无。
tableNameMaxCompute表名。无。
accessIdAccessKey ID。无。
accessKeyAccessKey Secret。无。
partition分区名。只存在一级分区的全量MaxCompute表例如,如果只存在1个分区列ds,则partition` = 'ds=20180905'`表示读`ds=20180905`分区的数据。存在多级分区的全量MaxCompute表例如,如果存在2个分区列`ds`和`hh`,则partition='ds=20180905,hh=*'表示读ds=20180905分区的数据。说明 分区过滤时需要声明所有分区的值。例如,上述示例中,只声明``partition= 'ds=20180905',则不会读取任何分区。
subscribeNewPartition是否监听符合条件的新分区。默认值为false,不监听新产生的分区。说明subscribeNewPartition参数值为true时,不可以指定partition的参数值 ,否则会造成无法读取新产生的分区的状况。该参数只在Blink 3.4.4版本临时存在,Blink 3.5.0版本废弃该参数。
subscribeIntervalInSec监听新分区的间隔。默认值为30,单位为秒。说明 监听间隔设置太小,会对全量MaxCompute metaData服务造成压力,有可能导致监听服务失败。
maxPartitionCount未设置Partition参数时,读取当前分区表的分区个数。默认值为100。说明 仅Blink 3.0及以上版本支持该参数。
类型映射
全量MaxCompute字段类型实时计算Flink版字段类型
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
BOOLEANBOOLEAN
DATETIMETIMESTAMP
TIMESTAMPTIMESTAMP
VARCHARVARCHAR
DECIMALDECIMAL
BINARYVARBINARY
STRINGVARCHAR
代码示例

包含全量MaxCompute源表的实时计算Flink版作业代码示例如下。

CREATE TABLE odps_source (
  cid varchar,
  rt DOUBLE,
) with (
  type = 'odps', 
  endPoint = '',
  project = '',
  tableName = '',
  accessId = '',
  accessKey = '',
  partition = 'ds=20190712'
);

CREATE TABLE test (
  cid varchar,
  invoke_count BIGINT
) with (
  type='print'
);

INSERT INTO test 
SELECt 
  cid,
  count(*) as invoke_count
FROM odps_source GROUP BY cid;
创建增量MaxCompute源表

注意

仅适用于Blink 3.5.0 hotfix及以上版本。增量MaxCompute源表不支持作为维表使用。增量MaxCompute源表只支持MaxCompute分区表,不支持非分区表。 语法示例

create table odps_source(
    id int,
    user_name VARCHAR,
    content VARCHAR
) with (
    type = 'continuous-odps',
    endPoint = 'your_end_point_name',
    project = 'your_project_name',
    tableName = 'your_table_name',
    accessId = 'your_access_id',
    accessKey = 'your_access_key',
    startPartition = 'ds=20180905'
);
WITH参数
参数说明是否必填备注
typeconnector类型固定值为continuous-odps。
endPointMaxCompute服务本身的连接地址请参见Endpoint。
tunnelEndpointMaxCompute Tunnel服务的连接地址是:VPC环境必填否:其他非VPC环境请参见Endpoint。
project表所属的project名称
tableName表名
accessIdAccessKey ID
accessKeyAccessKey Secret
startPartition指定读取的起始分区。系统加载分区列表时,会把每个分区列表的所有分区和startPartition按照字母顺序进行比较,加载满足条件的分区的数据。此外,增量MaxCompute源表可以持续监听增量MaxCompute分区表。读完已有的分区后,任务不会退出,且持续监听并读入新分区。说明增量MaxCompute源表中必须存在一级分区,二级分区可选。如果指定二级分区,必须写在一级分区的后面。如果startPartition指定的分区不存在,系统会从下一个分区开始读取数据。例如,指定startPartition是ds=20191201,表示加载增量MaxCompute表里所有满足ds >= 20191201的分区数据。如果一个增量MaxCompute分区表,有一级分区ds和二级分区type两个分区列,假设表里有以下5个分区:ds=20191201,type=ads=20191201,type=bds=20191202,type=ads=20191202,type=bds=20191202,type=c不同startPartition,满足分区的列表如下:ds=20191202ds=20191202,type=ads=20191202,type=bds=20191202,type=cds=20191201,type=cds=20191202,type=ads=20191202,type=bds=20191202,type=c
类型映射
MaxCompute字段类型实时计算Flink版字段类型
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
BOOLEANBOOLEAN
DATETIMETIMESTAMP
TIMESTAMPTIMESTAMP
DECIMALDECIMAL
BINARYVARBINARY
STRINGVARCHAR

注意

增量MaxCompute源表暂不支持CHAR、VARCHAR、ARRAY、MAP和STRUCT数据类型。您可以临时使用STRING替换VARCHAR。 代码示例

增量MaxCompute源表每天产生一个分区,分区列是ds,从ds=20191201

--读增量MaxCompute表,读取的分区范围是[ds=20191201,∞)。
CREATE TABLE odps_source (
    cid VARCHAR,
    rt DOUBLE,
) with (
    type = 'continuous-odps', 
    endPoint = 'your_end_point_name', 
    project = 'your_project_name',
    tableName = 'your_table_name',
    accessId = 'xxxx',
    accessKey = 'xxxx',
    startPartition = 'ds=20191201'
);

CREATE TABLE test (
    cid VARCHAR,
    rt DOUBLE,
) with (
    type='print'
);

INSERT INTO test 
SELECt 
    cid, rt FROM odps_source;
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735967.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号