栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Blink SQL之创建消息队列Kafka源表

Blink SQL之创建消息队列Kafka源表

创建消息队列Kafka源表 注意事项

仅适用于Blink 2.0及以上版本。仅适用于独享模式。Kafka源表支持读取自建Kafka集群,但需要注意版本对应关系,以及自建集群和Blink版集群的网络环境配置。二进制数据不支持本地调试,语法检查没有问题请进行线上调试。

从Kafka输出的数据为序列化后的VARBINARY(二进制)格式。对于输出的每条数据,需要您编写自定义表值函数(UDTF)将其解析为序列化前的数据结构。Kafka源表数据解析流程通常为:Kafka Source Table -> UDTF -> Realtime Compute for Apache Flink -> Sink。此外,Flink SQL中也支持通过CAST函数将VARBINARY解析为VARCHAR类型。

DDL定义

Kafka源表定义DDL部分必须与以下SQL完全一致,可以更改WITH参数中的设置。

create table kafka_stream(   --必须和Kafka源表中的5个字段的顺序和类型保持一致。
  messageKey VARBINARY,
  `message`    VARBINARY,
  topic      VARCHAR,
  `partition`  INT,
  `offset`     BIGINT        
) with (
  type ='kafka010',
  topic = '',
  `group.id` = '',
  ...
);
WITH参数

通用配置

参数注释说明是否必选备注
typeKafka对应版本Kafka版本需要是Kafka08、Kafka09、Kafka010或Kafka011。
topic读取的单个topic
topicPattern读取一批topic的表达式Topic用竖线(|)分隔。例如:topic1|topic2|topic3。
startupMode启动位点启动位点取值如下:
GROUP_OFFSETS(默认值):根据Group读取。
EARLIEST:从Kafka最早分区开始读取。
LATEST:从Kafka最新位点开始读取。
TIMESTAMP:从指定的时间点读取。
partitionDiscoveryIntervalMS定时检查是否有新分区产生Kafka 08版本:系统默认开启该功能。
Kafka 09版本及以上版本:不支持partitionDiscoveryIntervalMS参数。
extraConfig额外的KafkaConsumer配置项目不在可选配置项中,但是期望额外增加的配置。

Kafka08配置

Kafka08必选配置

参数注释说明是否必选
group.id消费组ID
zookeeper.connectzk链接地址

可选配置Key

consumer.idsocket.timeout.msfetch.message.max.bytesnum.consumer.fetchersauto.commit.enableauto.commit.interval.msqueued.max.message.chunksrebalance.max.retriesfetch.min.bytesfetch.wait.max.msrebalance.backoff.msrefresh.leader.backoff.msauto.offset.resetconsumer.timeout.msexclude.internal.topicspartition.assignment.strategyclient.idzookeeper.session.timeout.mszookeeper.connection.timeout.mszookeeper.sync.time.msoffsets.storageoffsets.channel.backoff.msoffsets.channel.socket.timeout.msoffsets.commit.max.retriesdual.commit.enabledpartition.assignment.strategysocket.receive.buffer.bytesfetch.min.bytes

Kafka09/Kafka010/Kafka011配置

Kafka09/Kafka010/Kafka011必选配置

参数注释说明
group.id消费组ID
bootstrap.serversKafka集群地址

Kafka09/Kafka010/Kafka011可选配置,请参Kafka官方文档进行配置。

Kafka09Kafka010Kafka011

当需要配置某选项时,在DDL中的WITH部分增加对应的参数即可。例如,配置SASL登录,需增加security.protocol、sasl.mechanism和sasl.jaas.config3个参数,示例如下。

create table kafka_stream(
  messageKey varbinary,
  `message` varbinary,
  topic varchar,
  `partition` int,
  `offset` bigint
) with (
  type ='kafka010',
  topic = '',
  `group.id` = '',
  ...,
  `security.protocol`='SASL_PLAINTEXT',
  `sasl.mechanism`='PLAIN',
  `sasl.jaas.config`='org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";'
);
Kafka版本对应关系
typeKafka版本
Kafka080.8.22
Kafka090.9.0.1
Kafka0100.10.2.1
Kafka0110.11.0.2及以上
Kafka消息解析示例

场景1:将Kafka中的数据进行计算,并将计算结果输出到RDS。

Kafka中保存了JSON格式数据,需要使用实时计算Flink版进行计算,消息格式示例如下。

{
  "name":"Alice",
  "age":13,
  "grade":"A"
}                

方法1:Kafka SOURCE->Realtime Compute for Apache Flink->RDS SINK

Blink 2.2.7及以上版本支持将VARBINARY类型通过CAST函数转换为VARCHAR类型,再通过JSON_VALUE函数对Kafka数据进行解析,示例如下。

CREATE TABLE kafka_src (
  messageKey  VARBINARY,
  `message`   VARBINARY,
  topic       VARCHAR,
  `partition` INT,
  `offset`    BIGINT
) WITH (
  type = 'kafka010'   --请参见Kafka版本对应关系。
);

CREATE TABLE rds_sink (
  `name`       VARCHAR,
  age         VARCHAR,
  grade       VARCHAR
) WITH(
  type='rds'
);

CREATE VIEW input_view AS 
  SELECt CAST(`message` as VARCHAR ) as `message`
FROM kafka_src;

INSERT INTO rds_sink
SELECt 
  JSON_VALUE(`message`,'$.name'),
  JSON_VALUE(`message`,'$.age'),
  JSON_VALUE(`message`,'$.grade')
FROM input_view;

方法2:Kafka Source->UDTF->Realtime Compute for Apache Flink->RDS Sink

针对不规则数据、复杂JSON数据,需要您自行编写UDTF代码进行解析,示例如下。

SQL

-- 定义解析Kafka message的UDTF。
CREATE FUNCTION kafkaparser AS 'com.alibaba.kafkaUDTF';

-- 定义源表。注意:Kafka源表DDL字段必须与以下示例完全一致。WITH中参数可以修改。
CREATE TABLE kafka_src (
  messageKey  VARBINARY,
  `message`   VARBINARY,
  topic       VARCHAR,
  `partition` INT,
  `offset`    BIGINT
) WITH (
  type = 'kafka010',    --请参见Kafka版本对应关系。
  topic = 'test_kafka_topic',
  `group.id` = 'test_kafka_consumer_group',
  bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3'
);

CREATE TABLE rds_sink (
  name       VARCHAR,
  age        INT,
  grade      VARCHAR,
  updateTime TIMESTAMP
) WITH(
  type='rds',
  url='jdbc:mysql://localhost:3306/test',
  tableName='test4',
  userName='test',
  password=''
);

-- 使用UDTF,将二进制数据解析成格式化数据。
CREATE VIEW input_view (
  name,
  age,
  grade,
  updateTime
) AS
SELECT
  T.name,
  T.age,
  T.grade,
  T.updateTime
FROM
  kafka_src as S,
  LATERAL TABLE (kafkaparser (`message`)) as T (
  name,
  age,
  grade,
  updateTime
);

-- 使用解析出的格式化数据进行计算,并将结果输出到RDS。
INSERT INTO rds_sink
SELECT 
  name,
  age,
  grade,
  updateTime
FROM input_view;                                

UDTF

Blink 2.2.4版本Maven依赖,示例如下。

    
        
            org.apache.flink
            flink-core
            blink-2.2.4-SNAPSHOT
            provided
        
        
            org.apache.flink
            flink-streaming-java_2.11
            blink-2.2.4-SNAPSHOT
            provided
        
        
            org.apache.flink
            flink-table_2.11
            blink-2.2.4-SNAPSHOT
            provided
        
        
            com.alibaba
            fastjson
            1.2.9
        
    
package com.alibaba;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.types.Row;
import java.io.UnsupportedEncodingException;
import java.sql.Timestamp;

public class kafkaUDTF extends TableFunction {
    public void eval(byte[] message) {
        try {
            
            String msg = new String(message, "UTF-8");
            try {
                JSONObject data = JSON.parseObject(msg);
                if (data != null) {
                    String name = data.getString("name") == null ? "null" : data.getString("name");
                    Integer age = data.getInteger("age") == null ? 0 : data.getInteger("age");
                    String grade = data.getString("grade") == null ? "null" : data.getString("grade");
                    Timestamp updateTime = data.getTimestamp("updateTime");

                    Row row = new Row(4);
                    row.setField(0, name);
                    row.setField(1, age);
                    row.setField(2, grade);
                    row.setField(3,updateTime );

                    System.out.println("Kafka message str ==>" + row.toString());
                    collect(row);
                }
            } catch (ClassCastException e) {
                System.out.println("Input data format error. Input data " + msg + "is not json string");
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    @Override
    // 如果返回值是Row,重新加载UDTF这个方法,并指明系统返回的字段类型。
    public DataType getResultType(Object[] arguments, Class[] argTypes) {
        return DataTypes.createRowType(DataTypes.STRING, DataTypes.INT, DataTypes.STRING, DataTypes.TIMESTAMP);
    }
}                                

场景2:从Kafka读取数据,输入实时计算Flink版进行窗口计算。

按照实时计算Flink版目前的设计,滚动或滑动等窗口操作,必须在源表DDL上定义Watermark。Kafka源表比较特殊。如果要以Kafka中message字段中的Event Time进行窗口操作,需要先从message字段使用UDX解析出Event Time,才能定义Watermark。在Kafka源表场景中,需要使用计算列。例如Kafka中写入数据:2018-11-11 00:00:00|1|Anna|female 。计算流程为:Kafka Source->UDTF->Realtime Compute for Apache Flink->RDS Sink。

方法1:Kafka SOURCE->Realtime Compute for Apache Flink->RDS SINK

Blink 2.2.7及以上版本支持将VARBINARY类型通过CAST函数转换为VARCHAR类型,再通过JSON_VALUE函数对Kafka数据进行解析,示例如下。

CREATE TABLE kafka_src (
  messageKey VARBINARY,
  `message` VARBINARY,
  topic VARCHAR,
  `partition` INT,
  `offset` BIGINT,
  ts as to_timestamp(json_value(cast(`message` as VARCHAR ),'$.nodes.time')),
  WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (type = 'kafka' --请参见Kafka版本对应关系。
);

CREATE TABLE rds_sink (
  starttime TIMESTAMP ,
  endtime   TIMESTAMP ,
  `message` BIGINT 
) WITH (type = 'rds');

INSERT
  INTO rds_sink
SELECT
  TUMBLE_START(ts, INTERVAL '1' MINUTE),
  TUMBLE_END(ts, INTERVAL '1' MINUTE),
  count(`message`)
FROM
  kafka_src
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);

方法2:Kafka SOURCE->UDTF->Realtime Compute for Apache Flink->RDS SINK

SQL

-- 定义解析Kafka message的UDTF。
CREATE FUNCTION kafkapaser AS 'com.alibaba.kafkaUDTF';
CREATE FUNCTION kafkaUDF AS 'com.alibaba.kafkaUDF';

-- 定义源表,注意:Kafka源表DDL字段必须与以下示例一模一样。WITH中参数可改。
create table kafka_src (
  messageKey VARBINARY,
  `message` VARBINARY,
  topic VARCHAR,
  `partition` INT,
  `offset` BIGINT,
  ctime AS TO_TIMESTAMP(kafkaUDF(`message`)), -- 定义计算列,计算列可理解为占位符,源表中并没有这一列,其中的数据可经过下游计算得出。注意:计算列的类型必须为TIMESTAMP才能创建Watermark。
  watermark for `ctime` as withoffset(`ctime`,0) -- 在计算列上定义Watermark。
) WITH (
  type = 'kafka010', -- 请参见Kafka版本对应关系。
  topic = 'test_kafka_topic',
  `group.id` = 'test_kafka_consumer_group',
  bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3'
);

create table rds_sink (
  `name` VARCHAR,
  age INT,
  grade VARCHAR,
  updateTime TIMESTAMP
) WITH(
  type='rds',
  url='jdbc:mysql://localhost:3306/test',
  tableName='test4',
  userName='test',
  password=''
);


-- 使用UDTF,将二进制数据解析成格式化数据。
CREATE VIEW input_view AS
SELECT
  S.ctime,
  T.`order`,
  T.`name`,
  T.sex
  from
  kafka_src as S,
  LATERAL TABLE (kafkapaser (`message`)) as T (
  ctime,
  `order`,
  `name`,
  sex
);

-- 对input_view中输出的数据做计算。
CREATE VIEW view2 (
  cnt,
  sex
) AS
  SELECt
  COUNT(*) as cnt,
  T.sex
  from
  input_view
Group BY sex, TUMBLE(ctime, INTERVAL '1' MINUTE);

-- 使用解析出的格式化数据进行计算,并将结果输出到RDS。
insert into rds_sink
  SELECt
  cnt,sex
from view2;

UDF&UDTF

Blink 2.2.4版本Maven依赖,示例如下。

  
        
            org.apache.flink
            flink-core
            blink-2.2.4-SNAPSHOT
            provided
        
        
            org.apache.flink
            flink-streaming-java_2.11
            blink-2.2.4-SNAPSHOT
            provided
        
        
            org.apache.flink
            flink-table_2.11
            blink-2.2.4-SNAPSHOT
            provided
        
        
            com.alibaba
            fastjson
            1.2.9
        
                                    

UDTF

package com.alibaba;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.types.Row;
import java.io.UnsupportedEncodingException;


public class kafkaUDTF extends TableFunction {

    public void eval(byte[] message) {
        try {
          // 读入一个二进制数据,并将其转换为String格式。
            String msg = new String(message, "UTF-8");

                // 提取JSON Object中各字段。
                    String ctime = Timestamp.valueOf(data.split('\|')[0]);
                    String order = data.split('\|')[1];
                    String name = data.split('\|')[2];
                    String sex = data.split('\|')[3];

                    // 将解析出的字段放到要输出的Row()对象。
                    Row row = new Row(4);
                    row.setField(0, ctime);
                    row.setField(1, age);
                    row.setField(2, grade);
                    row.setField(3, updateTime);

                    System.out.println("Kafka message str ==>" + row.toString());

                    // 输出一行。
                    collect(row);

            } catch (ClassCastException e) {
                System.out.println("Input data format error. Input data " + msg + "is not json string");
            }


        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

    }

    @Override
    // 如果返回值是Row,重新加载UDTF这个方法,并指明系统返回的字段类型。
    // 定义输出Row()对象的字段类型。
    public DataType getResultType(Object[] arguments, Class[] argTypes) {
        return DataTypes.createRowType(DataTypes.TIMESTAMP,DataTypes.STRING, DataTypes.Integer, DataTypes.STRING,DataTypes.STRING);
    }

}

UDF

package com.alibaba;
package com.hjc.test.blink.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

public class KafkaUDF extends ScalarFunction {
    // 可选,open方法可以不写。
    // 需要import org.apache.flink.table.functions.FunctionContext;

    public String eval(byte[] message) {

         // 读入一个二进制数据,并将其转换为String格式。
        String msg = new String(message, "UTF-8");
        return msg.split('\|')[0];
    }
    public long eval(String b, String c) {
        return eval(b) + eval(c);
    }
    //可选,close方法可以不写。
    @Override
    public void close() {
        }
}                                        
自建Kafka

示例

create table kafka_stream(
  messageKey VARBINARY,
  `message` VARBINARY, 
  topic varchar,
  `partition` int,
  `offset` bigint
) with (
  type ='kafka011',
  topic = 'kafka_01',
  `group.id` = 'CID_blink',
  bootstrap.servers = '192.168.0.251:****'
);                

WITH参数

bootstrap.servers参数需要填写自建的地址和端口号。

仅在Blink 2.2.6及以上版本支持阿里云Kafka或自建Kafka显示TPS和RPS等指标信息。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/729001.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号