栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

flink Sql自定义kafka连接器

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flink Sql自定义kafka连接器

文章目录
  • flink sql自定义kafka连接器
    • 1. Overview
    • 2. 自定义kafka连接器
      • 2.2 maven相关依赖
      • 2.3 自定义Factory
      • 2.4 测试

flink sql自定义kafka连接器

在流式计算平台当中,为保证flink sql connector相关参数不暴露,官方提供的连接器不满足业务场景及产品本身的要求,通过可以改源码或者说自定义连接器解决。

1. Overview

源码架构

源码查找:FactoryUtil.discoverFactory 获取所有的连接器Factory,包括自定义工厂
获取工厂调用 factory.createDynamicTableSource(context);

2. 自定义kafka连接器

要求:添加kafka连接器参数connection-name,通过http请求到数据安全平台获取数据

2.2 maven相关依赖

根据自身的开放场景添加依赖

    
        8
        8
        1.12.0
        2.11
        1.12.7
        5.1.38
        1.12.7
        20.0
    


    
        
            org.apache.flink
            flink-table-planner-blink_${scala.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-table-api-java-bridge_${scala.version}
            ${flink.version}
        
        
        
            org.apache.flink
            flink-table-common
            ${flink.version}
        

        
            org.apache.flink
            flink-clients_${scala.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-connector-jdbc_${scala.version}
            ${flink.connector.jdbc.version}
        

        
            org.apache.flink
            flink-connector-kafka_${scala.version}
            ${flink.version}
        

        
            mysql
            mysql-connector-java
            ${mysql.connector.java.version}
        

        
            org.apache.flink
            flink-csv
            ${flink.csv.version}
        

        
        
            org.projectlombok
            lombok
            1.18.10
            true
        

        
            com.alibaba
            fastjson
            1.2.28
        

        
            cn.hutool
            hutool-all
            5.5.2
        

        
            com.google.guava
            guava
            ${google.guava.version}
        

    
2.3 自定义Factory
  1. DynamicTableSourceFactory:所有自定的连接器工厂需要继承它
  2. resources/META-INF/services目录下要创建org.apache.flink.table.factories.Factory文件,将自定义的工厂全路径添加进去;注意org.apache.flink.table.factories.Factory必须是这个,否则classLoader加载不到
import com.alibaba.fastjson.JSONObject;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory;
import org.apache.flink.streaming.connectors.kafka.table.KafkaOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.Format;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import pupu.flink.constan.Constant;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.*;

import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.*;



@Internal
@Slf4j
public class MineKafkaDynamicTableFactory extends KafkaDynamicTableFactory {

    // 反射,用于调用类中的的(static)private方法
    private final Class clazz = KafkaDynamicTableFactory.class;

    // 映射sql中的connector
    public static final String IDENTIFIER = "mine-kafka";

    public static final ConfigOption CONNECTION_NAME = ConfigOptions
            .key("connection—name")
            .stringType()
            .noDefaultValue()
            .withDescription("映射kafka集群的参数!");

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    
    @Override
    public Set> requiredOptions() {
        final Set> options = new HashSet<>();
        return options;
    }

    
    public DynamicTableSource createDynamicTableSource(Context context) {
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

        final ReadableConfig tableOptions = helper.getOptions();

        final Optional>> keyDecodingFormat =
                invokeGetKeyDecodingFormat(helper);

        final DecodingFormat> valueDecodingFormat =
                invokeGetValueDecodingFormat(helper);

        helper.validateExcept(PROPERTIES_PREFIX);

        validateTableSourceOptions(tableOptions);

        invokeValidatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);

        final StartupOptions startupOptions = getStartupOptions(tableOptions);

        final Properties properties = getPupuKafkaProperties(context);

        // add topic-partition discovery
        properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
                String.valueOf(tableOptions
                        .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
                        .map(Duration::toMillis)
                        .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));

        final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();

        final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);

        final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);

        final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);

        return createKafkaTableSource(
                physicalDataType,
                keyDecodingFormat.orElse(null),
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                getSourceTopics(tableOptions),
                getSourceTopicPattern(tableOptions),
                properties,
                startupOptions.startupMode,
                startupOptions.specificOffsets,
                startupOptions.startupTimestampMillis);

    }

    
    public Properties getPupuKafkaProperties(Context context) {
        final Properties kafkaProperties = new Properties();

        Map tableOptions = context.getCatalogTable().getOptions();

        if (invokeHasKafkaClientProperties(tableOptions)) {
            tableOptions.keySet().stream()
                    .filter(key -> key.startsWith(PROPERTIES_PREFIX))
                    .forEach(key -> {
                        final String value = tableOptions.get(key);
                        final String subKey = key.substring((PROPERTIES_PREFIX).length());
                        kafkaProperties.put(subKey, value);
                    });
        }
        httpMetadata(context, kafkaProperties);
        return kafkaProperties;
    }

    
    private void httpMetadata(Context context, Properties kafkaProperties) {

        //  获取自定义参数
        ReadableConfig configuration = context.getConfiguration();
        String metaUrl = configuration.get(ConfigOptions.key(Constant.HTTP_PARAMS_METAURL).stringType().defaultValue("metaSecret"));
        String metaSecret = configuration.get(ConfigOptions.key(Constant.HTTP_PARAMS_METASECRET).stringType().defaultValue("metaSecret"));
        String metaDbSecret = configuration.get(ConfigOptions.key(Constant.HTTP_PARAMS_METADBSECRET).stringType().defaultValue("metaDbSecret"));
        context.getCatalogTable().getOptions().get(CONNECTION_NAME.key());

        // 参数进行非空校验
        if (StringUtils.isAnyBlank(metaUrl, metaSecret, metaDbSecret)) {
            throw new ValidationException(
                    String.format("请检查数据参数【%s,%s,%s】是否正确!",
                            Constant.HTTP_PARAMS_METAURL,
                            Constant.HTTP_PARAMS_METASECRET,
                            Constant.HTTP_PARAMS_METADBSECRET));
        }
        String connectName = context.getCatalogTable().getOptions().get(CONNECTION_NAME.key());

        Optional jsonObjectOptions = toHttp(metaUrl, metaSecret, metaDbSecret, connectName);

        if (jsonObjectOptions.isPresent()) {
            final String key = "url";
            final String bootStrapServers = "bootstrap.servers";
            JSONObject jsonObject = jsonObjectOptions.get();
            if(jsonObject.containsKey(key)) {
                kafkaProperties.put(bootStrapServers, jsonObject.getString(key));
            }
        }
    }

    
    @SneakyThrows
    private Optional>> invokeGetKeyDecodingFormat(FactoryUtil.TableFactoryHelper helper) {
        final String methodName = "getKeyDecodingFormat";
        Method method = clazz.getDeclaredMethod(methodName, FactoryUtil.TableFactoryHelper.class);
        method.setAccessible(true);
        Object value = method.invoke(this, helper);
        return (Optional>>) value;
    }

    
    @SneakyThrows
    private DecodingFormat>  invokeGetValueDecodingFormat(FactoryUtil.TableFactoryHelper helper) {
        final String methodName = "getValueDecodingFormat";
        Method method = clazz.getDeclaredMethod(methodName, FactoryUtil.TableFactoryHelper.class);
        method.setAccessible(true);
        Object value = method.invoke(this, helper);
        return (DecodingFormat>) value;
    }

    
    @SneakyThrows
    private void invokeValidatePKConstraints(ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
        final String methodName = "validatePKConstraints";
        Method method = clazz.getDeclaredMethod(methodName, ObjectIdentifier.class, CatalogTable.class, Format.class);
        method.setAccessible(true);
        method.invoke(this, tableName, catalogTable, format);
    }


    
    @SneakyThrows
    private boolean invokeHasKafkaClientProperties(Map tableOptions) {
        final Class clazz = KafkaOptions.class;
        final String methodName = "hasKafkaClientProperties";
        Method method = clazz.getDeclaredMethod(methodName, Map.class);
        method.setAccessible(true);
        // 静态方法调用,调用对象可以传空
        return (Boolean) method.invoke(null, tableOptions);
    }

    
    private Optional toHttp(String Url, String Secret, String DbSecret, String connectName) {

        // 相关加强业务代码
        System.out.println("我加强了MineKafkaDynamicTableFactory");
        return Optional.empty();

    }

}


2.4 测试
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import pupu.flink.pupu.bean.User;

public class MineKafkaDynamicTableFactoryTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

        tableEnv.getConfig().getConfiguration().setString("abc", "abc");

        String kafkaConnectorSql = "CREATE TABLE kafkaTable (n" +
                "  id INT,n" +
                "  name STRING,n" +
                "  age INTn" +
                ") WITH (n" +
                "  'connector' = 'mine-kafka',n" +
                "  'topic' = 'csv-message',n" +
                "  'properties.bootstrap.servers' = 'xxxx',n" +
                "  'properties.group.id' = 'testGroup',n" +
                "  'scan.startup.mode' = 'earliest-offset',n" +
                "  'format' = 'csv'n" +
                ")";

        tableEnv.executeSql(kafkaConnectorSql);

        Table table = tableEnv.sqlQuery("select * from kafkaTable");

        tableEnv.toAppendStream(table, User.class).print();

        env.execute();

    }

}

执行结果:kafka走的是自定义连接器

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/884478.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号