仅适用于Blink 2.0及以上版本。仅适用于独享模式。Kafka源表支持读取自建Kafka集群,但需要注意版本对应关系,以及自建集群和Blink版集群的网络环境配置。二进制数据不支持本地调试,语法检查没有问题请进行线上调试。
从Kafka输出的数据为序列化后的VARBINARY(二进制)格式。对于输出的每条数据,需要您编写自定义表值函数(UDTF)将其解析为序列化前的数据结构。Kafka源表数据解析流程通常为:Kafka Source Table -> UDTF -> Realtime Compute for Apache Flink -> Sink。此外,Flink SQL中也支持通过CAST函数将VARBINARY解析为VARCHAR类型。
DDL定义Kafka源表定义DDL部分必须与以下SQL完全一致,可以更改WITH参数中的设置。
create table kafka_stream( --必须和Kafka源表中的5个字段的顺序和类型保持一致。 messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT ) with ( type ='kafka010', topic = 'WITH参数', `group.id` = ' ', ... );
通用配置
| 参数 | 注释说明 | 是否必选 | 备注 |
|---|---|---|---|
| type | Kafka对应版本 | 是 | Kafka版本需要是Kafka08、Kafka09、Kafka010或Kafka011。 |
| topic | 读取的单个topic | 是 | 无 |
| topicPattern | 读取一批topic的表达式 | 否 | Topic用竖线(|)分隔。例如:topic1|topic2|topic3。 |
| startupMode | 启动位点 | 否 | 启动位点取值如下: GROUP_OFFSETS(默认值):根据Group读取。 EARLIEST:从Kafka最早分区开始读取。 LATEST:从Kafka最新位点开始读取。 TIMESTAMP:从指定的时间点读取。 |
| partitionDiscoveryIntervalMS | 定时检查是否有新分区产生 | 否 | Kafka 08版本:系统默认开启该功能。 Kafka 09版本及以上版本:不支持partitionDiscoveryIntervalMS参数。 |
| extraConfig | 额外的KafkaConsumer配置项目 | 否 | 不在可选配置项中,但是期望额外增加的配置。 |
Kafka08配置
Kafka08必选配置
| 参数 | 注释说明 | 是否必选 |
|---|---|---|
| group.id | 消费组ID | 是 |
| zookeeper.connect | zk链接地址 | 是 |
可选配置Key
consumer.idsocket.timeout.msfetch.message.max.bytesnum.consumer.fetchersauto.commit.enableauto.commit.interval.msqueued.max.message.chunksrebalance.max.retriesfetch.min.bytesfetch.wait.max.msrebalance.backoff.msrefresh.leader.backoff.msauto.offset.resetconsumer.timeout.msexclude.internal.topicspartition.assignment.strategyclient.idzookeeper.session.timeout.mszookeeper.connection.timeout.mszookeeper.sync.time.msoffsets.storageoffsets.channel.backoff.msoffsets.channel.socket.timeout.msoffsets.commit.max.retriesdual.commit.enabledpartition.assignment.strategysocket.receive.buffer.bytesfetch.min.bytes
Kafka09/Kafka010/Kafka011配置
Kafka09/Kafka010/Kafka011必选配置
| 参数 | 注释说明 |
|---|---|
| group.id | 消费组ID |
| bootstrap.servers | Kafka集群地址 |
Kafka09/Kafka010/Kafka011可选配置,请参Kafka官方文档进行配置。
Kafka09Kafka010Kafka011
当需要配置某选项时,在DDL中的WITH部分增加对应的参数即可。例如,配置SASL登录,需增加security.protocol、sasl.mechanism和sasl.jaas.config3个参数,示例如下。
create table kafka_stream( messageKey varbinary, `message` varbinary, topic varchar, `partition` int, `offset` bigint ) with ( type ='kafka010', topic = 'Kafka版本对应关系', `group.id` = ' ', ..., `security.protocol`='SASL_PLAINTEXT', `sasl.mechanism`='PLAIN', `sasl.jaas.config`='org.apache.kafka.common.security.plain.PlainLoginModule required username=" " password=" ";' );
| type | Kafka版本 |
|---|---|
| Kafka08 | 0.8.22 |
| Kafka09 | 0.9.0.1 |
| Kafka010 | 0.10.2.1 |
| Kafka011 | 0.11.0.2及以上 |
场景1:将Kafka中的数据进行计算,并将计算结果输出到RDS。
Kafka中保存了JSON格式数据,需要使用实时计算Flink版进行计算,消息格式示例如下。
{
"name":"Alice",
"age":13,
"grade":"A"
}
方法1:Kafka SOURCE->Realtime Compute for Apache Flink->RDS SINK
Blink 2.2.7及以上版本支持将VARBINARY类型通过CAST函数转换为VARCHAR类型,再通过JSON_VALUE函数对Kafka数据进行解析,示例如下。
CREATE TABLE kafka_src ( messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT ) WITH ( type = 'kafka010' --请参见Kafka版本对应关系。 ); CREATE TABLE rds_sink ( `name` VARCHAR, age VARCHAR, grade VARCHAR ) WITH( type='rds' ); CREATE VIEW input_view AS SELECt CAST(`message` as VARCHAR ) as `message` FROM kafka_src; INSERT INTO rds_sink SELECt JSON_VALUE(`message`,'$.name'), JSON_VALUE(`message`,'$.age'), JSON_VALUE(`message`,'$.grade') FROM input_view;
方法2:Kafka Source->UDTF->Realtime Compute for Apache Flink->RDS Sink
针对不规则数据、复杂JSON数据,需要您自行编写UDTF代码进行解析,示例如下。
SQL
-- 定义解析Kafka message的UDTF。 CREATE FUNCTION kafkaparser AS 'com.alibaba.kafkaUDTF'; -- 定义源表。注意:Kafka源表DDL字段必须与以下示例完全一致。WITH中参数可以修改。 CREATE TABLE kafka_src ( messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT ) WITH ( type = 'kafka010', --请参见Kafka版本对应关系。 topic = 'test_kafka_topic', `group.id` = 'test_kafka_consumer_group', bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3' ); CREATE TABLE rds_sink ( name VARCHAR, age INT, grade VARCHAR, updateTime TIMESTAMP ) WITH( type='rds', url='jdbc:mysql://localhost:3306/test', tableName='test4', userName='test', password='' ); -- 使用UDTF,将二进制数据解析成格式化数据。 CREATE VIEW input_view ( name, age, grade, updateTime ) AS SELECT T.name, T.age, T.grade, T.updateTime FROM kafka_src as S, LATERAL TABLE (kafkaparser (`message`)) as T ( name, age, grade, updateTime ); -- 使用解析出的格式化数据进行计算,并将结果输出到RDS。 INSERT INTO rds_sink SELECT name, age, grade, updateTime FROM input_view;
UDTF
Blink 2.2.4版本Maven依赖,示例如下。
org.apache.flink flink-core blink-2.2.4-SNAPSHOT provided org.apache.flink flink-streaming-java_2.11 blink-2.2.4-SNAPSHOT provided org.apache.flink flink-table_2.11 blink-2.2.4-SNAPSHOT provided com.alibaba fastjson 1.2.9
package com.alibaba; import com.alibaba.fastjson.JSONObject; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataTypes; import org.apache.flink.types.Row; import java.io.UnsupportedEncodingException; import java.sql.Timestamp; public class kafkaUDTF extends TableFunction{ public void eval(byte[] message) { try { String msg = new String(message, "UTF-8"); try { JSONObject data = JSON.parseObject(msg); if (data != null) { String name = data.getString("name") == null ? "null" : data.getString("name"); Integer age = data.getInteger("age") == null ? 0 : data.getInteger("age"); String grade = data.getString("grade") == null ? "null" : data.getString("grade"); Timestamp updateTime = data.getTimestamp("updateTime"); Row row = new Row(4); row.setField(0, name); row.setField(1, age); row.setField(2, grade); row.setField(3,updateTime ); System.out.println("Kafka message str ==>" + row.toString()); collect(row); } } catch (ClassCastException e) { System.out.println("Input data format error. Input data " + msg + "is not json string"); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override // 如果返回值是Row,重新加载UDTF这个方法,并指明系统返回的字段类型。 public DataType getResultType(Object[] arguments, Class[] argTypes) { return DataTypes.createRowType(DataTypes.STRING, DataTypes.INT, DataTypes.STRING, DataTypes.TIMESTAMP); } }
场景2:从Kafka读取数据,输入实时计算Flink版进行窗口计算。
按照实时计算Flink版目前的设计,滚动或滑动等窗口操作,必须在源表DDL上定义Watermark。Kafka源表比较特殊。如果要以Kafka中message字段中的Event Time进行窗口操作,需要先从message字段使用UDX解析出Event Time,才能定义Watermark。在Kafka源表场景中,需要使用计算列。例如Kafka中写入数据:2018-11-11 00:00:00|1|Anna|female 。计算流程为:Kafka Source->UDTF->Realtime Compute for Apache Flink->RDS Sink。
方法1:Kafka SOURCE->Realtime Compute for Apache Flink->RDS SINK
Blink 2.2.7及以上版本支持将VARBINARY类型通过CAST函数转换为VARCHAR类型,再通过JSON_VALUE函数对Kafka数据进行解析,示例如下。
CREATE TABLE kafka_src ( messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT, ts as to_timestamp(json_value(cast(`message` as VARCHAR ),'$.nodes.time')), WATERMARK wk FOR ts as withOffset(ts, 2000) ) WITH (type = 'kafka' --请参见Kafka版本对应关系。 ); CREATE TABLE rds_sink ( starttime TIMESTAMP , endtime TIMESTAMP , `message` BIGINT ) WITH (type = 'rds'); INSERT INTO rds_sink SELECT TUMBLE_START(ts, INTERVAL '1' MINUTE), TUMBLE_END(ts, INTERVAL '1' MINUTE), count(`message`) FROM kafka_src GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
方法2:Kafka SOURCE->UDTF->Realtime Compute for Apache Flink->RDS SINK
SQL
-- 定义解析Kafka message的UDTF。 CREATE FUNCTION kafkapaser AS 'com.alibaba.kafkaUDTF'; CREATE FUNCTION kafkaUDF AS 'com.alibaba.kafkaUDF'; -- 定义源表,注意:Kafka源表DDL字段必须与以下示例一模一样。WITH中参数可改。 create table kafka_src ( messageKey VARBINARY, `message` VARBINARY, topic VARCHAR, `partition` INT, `offset` BIGINT, ctime AS TO_TIMESTAMP(kafkaUDF(`message`)), -- 定义计算列,计算列可理解为占位符,源表中并没有这一列,其中的数据可经过下游计算得出。注意:计算列的类型必须为TIMESTAMP才能创建Watermark。 watermark for `ctime` as withoffset(`ctime`,0) -- 在计算列上定义Watermark。 ) WITH ( type = 'kafka010', -- 请参见Kafka版本对应关系。 topic = 'test_kafka_topic', `group.id` = 'test_kafka_consumer_group', bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3' ); create table rds_sink ( `name` VARCHAR, age INT, grade VARCHAR, updateTime TIMESTAMP ) WITH( type='rds', url='jdbc:mysql://localhost:3306/test', tableName='test4', userName='test', password='' ); -- 使用UDTF,将二进制数据解析成格式化数据。 CREATE VIEW input_view AS SELECT S.ctime, T.`order`, T.`name`, T.sex from kafka_src as S, LATERAL TABLE (kafkapaser (`message`)) as T ( ctime, `order`, `name`, sex ); -- 对input_view中输出的数据做计算。 CREATE VIEW view2 ( cnt, sex ) AS SELECt COUNT(*) as cnt, T.sex from input_view Group BY sex, TUMBLE(ctime, INTERVAL '1' MINUTE); -- 使用解析出的格式化数据进行计算,并将结果输出到RDS。 insert into rds_sink SELECt cnt,sex from view2;
UDF&UDTF
Blink 2.2.4版本Maven依赖,示例如下。
org.apache.flink flink-core blink-2.2.4-SNAPSHOT provided org.apache.flink flink-streaming-java_2.11 blink-2.2.4-SNAPSHOT provided org.apache.flink flink-table_2.11 blink-2.2.4-SNAPSHOT provided com.alibaba fastjson 1.2.9
UDTF
package com.alibaba; import com.alibaba.fastjson.JSONObject; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataTypes; import org.apache.flink.types.Row; import java.io.UnsupportedEncodingException; public class kafkaUDTF extends TableFunction{ public void eval(byte[] message) { try { // 读入一个二进制数据,并将其转换为String格式。 String msg = new String(message, "UTF-8"); // 提取JSON Object中各字段。 String ctime = Timestamp.valueOf(data.split('\|')[0]); String order = data.split('\|')[1]; String name = data.split('\|')[2]; String sex = data.split('\|')[3]; // 将解析出的字段放到要输出的Row()对象。 Row row = new Row(4); row.setField(0, ctime); row.setField(1, age); row.setField(2, grade); row.setField(3, updateTime); System.out.println("Kafka message str ==>" + row.toString()); // 输出一行。 collect(row); } catch (ClassCastException e) { System.out.println("Input data format error. Input data " + msg + "is not json string"); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override // 如果返回值是Row,重新加载UDTF这个方法,并指明系统返回的字段类型。 // 定义输出Row()对象的字段类型。 public DataType getResultType(Object[] arguments, Class[] argTypes) { return DataTypes.createRowType(DataTypes.TIMESTAMP,DataTypes.STRING, DataTypes.Integer, DataTypes.STRING,DataTypes.STRING); } }
UDF
package com.alibaba;
package com.hjc.test.blink.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public class KafkaUDF extends ScalarFunction {
// 可选,open方法可以不写。
// 需要import org.apache.flink.table.functions.FunctionContext;
public String eval(byte[] message) {
// 读入一个二进制数据,并将其转换为String格式。
String msg = new String(message, "UTF-8");
return msg.split('\|')[0];
}
public long eval(String b, String c) {
return eval(b) + eval(c);
}
//可选,close方法可以不写。
@Override
public void close() {
}
}
自建Kafka
示例
create table kafka_stream( messageKey VARBINARY, `message` VARBINARY, topic varchar, `partition` int, `offset` bigint ) with ( type ='kafka011', topic = 'kafka_01', `group.id` = 'CID_blink', bootstrap.servers = '192.168.0.251:****' );
WITH参数
bootstrap.servers参数需要填写自建的地址和端口号。
仅在Blink 2.2.6及以上版本支持阿里云Kafka或自建Kafka显示TPS和RPS等指标信息。



