栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Blink SQL之创建日志服务SLS源表

Blink SQL之创建日志服务SLS源表

创建日志服务SLS源表

仅适用于Blink 1.4.5及以上版本。

什么是日志服务

日志服务SLS是针对日志类数据的一站式服务,对于日志服务而言,数据格式类似JSON,示例如下。

{
    "a": 1000,
    "b": 1234,
    "c": "li"
}

日志服务本身是流数据存储,Blink能将其作为流式数据的输入。

语法示例

日志服务源表DDL示例如下(代码中的sls代表日志服务)。

create table sls_stream(
  a INT,
  b INT,
  c VARCHAR
) with (
  type ='sls',  
  endPoint ='http://cn-hangzhou-share.log.aliyuncs.com',
  accessId ='',
  accessKey ='',
  startTime = '2017-07-05 00:00:00',
  project ='',
  logStore ='',
  consumerGroup =''
);
WITH参数
参数说明是否必选备注
type源表类型固定值为sls。
endPoint消费端点信息服务入口
accessIdAccessKey ID
accessKeyAccessKey Secret
project读取的SLS项目名称
logStoreProject下的具体的LogStore名称
startTime日志消费的开始时间
consumerGroup消费组名您可以自定义消费组名(没有固定格式)。
heartBeatIntervalMills消费客户端心跳间隔时间默认值为10000,单位为毫秒。
maxRetryTimes读取最大尝试次数默认值为5。
batchGetSize单次读取logGroup的条数默认值为100。
columnErrorDebug是否打开调试开关默认值为false,不打开。如果选择打开,则打印解析异常的日志。
startupMode启动消费模式取值如下:
TIMESTAMP(默认值):每个Shard从指定时间开始消费。
Earliest:每个Shard从最早位置开始消费。
Latest:每个Shard从最新位置开始消费。
Group_Offsets:每个Shard优先从服务端保存的Checkpoint开始消费,必须指定consumerGroup。消费模式有以下几种情况:
如果从Flink State中恢复状态成功,则从Flink状态中的Checkpoint开始消费。
如果从Flink State中恢复状态失败:
如果consumerGroup中存在Checkpoint,则尝试从consumerGroup的Checkpoint开始消费。
如果consumerGroup中不存在Checkpoint:
指定了startTime:从startTime开始消费。
未指定startTime:每个shard从最早位置开始消费。
注意
仅Blink 3.6.5及以上版本支持该参数;并且仅当state中不存在Checkpoint时,上述配置才有效。

备注

1.6.0及以下版本,在指定consumerGroup的Shards数目时,可能会影响读取性能。

SLS暂不支持MAP类型的数据。

SLS对于不存在字段会置为Null。

字段顺序支持无序(建议字段顺序和表中定义一致)。

输入数据源为JSON形式时,注意定义分隔符,并且需要采用内置函数JSON_VALUE分析,否则就会解析失败,报错如下。

2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing **error**, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]

batchGetSize设置不能超过1000,否则会报错。

batchGetSize指明的是logGroup获取的数量。如果单条logItem的大小和batchGetSize都很大,有可能会导致频繁的垃圾回收(Garbage Collection),这种情况下该参数应调小。

类型映射

建议您使用该对应关系进行DDL声明。

日志服务字段类型实时计算Blink版字段类型
STRINGVARCHAR
属性字段

目前Flink SQL默认支持3个SLS属性字段的获取,也支持其它自定义字段的写入。

字段名注释说明
__source__消息源
__topic__消息主题
__timestamp__日志时间
代码示例
create table sls_input(
  a int, 
  b int,
  c varchar
) with (
  type ='sls',
  endPoint ='http://cn-hangzhou-share.log.aliyuncs.com',
  accessId ='',
  accessKey ='',
  startTime = '2017-07-05 00:00:00',
  project ='ali-cloud-streamtest',
  logStore ='stream-test',
  consumerGroup ='consumerGroupTest1'
);

create table print_output(
 a int,
 b int,
 c varchar 
) with (
  type='print'
);

INSERT INTO print_output
SELECt 
  a, b, c
from sls_input;
常见问题

    Q:为什么Job的整体延迟增加,或有窗口聚合的Job无输出?

    A:如果一个Partition没有新数据写入,会导致上述情况,只需要把并发数调整为读写的Partition数量即可。

    Q:如何设置并发数?

    A:并发数建议等于Partition数量,否则当两个Partition读取速度差异较大时,理论上在追数据场景,存在数据被过滤掉和数据延迟的风险。

    Q:Flink Job延迟增大应该如何排查?

    A:SLS源表可能会发生Shard分裂,分裂后的Shard index会不连续,导致Flink延迟增大。如果发现Flink Job延迟增大,请查看SLS源是否发生分裂。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710317.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号