栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

FlinkSQL动态语法验证器提取

FlinkSQL动态语法验证器提取

前言

本文编写2021.2.25
从官网的资料来看,DDL数据源到运行流程如下:

图片来源官网

那么关于建立动态表后,执行的FlinkSQL语句的语法是否正确,我们应当在Planning中进行。


生产消费者

FlinkSQL使用DDL创建Kafka生产和消费者


Git项目的SQL语句语法校验

缺点:无上下文环境,单纯的语法校验。

https://github.com/zhp8341/flink-streaming-platform-web

关于该部分代码:https://www.jianshu.com/p/8b878ce64da9

CalciteParser parser = new CalciteParser(getSqlParserConfig(tEnv.getConfig()));
try {
     parser.parse(sql);
} catch (Exception e) {
	System.out.println("出现语法错误:" + e.toString().split(": ")[1].split("Was")[0]);	//编者改
}
    private static SqlParser.Config getSqlParserConfig(TableConfig tableConfig) {
        return JavaScalaConversionUtil.toJava(getCalciteConfig(tableConfig).getSqlParserConfig()).orElseGet(
                () -> {
                    SqlConformance conformance = getSqlConformance(tableConfig);
                    return SqlParser
                            .config()
                            .withParserFactory(FlinkSqlParserFactories.create(conformance))
                            .withConformance(conformance)
                            .withLex(Lex.JAVA)
                            .withIdentifierMaxLength(256);
                }
        );
    }

    private static CalciteConfig getCalciteConfig(TableConfig tableConfig) {
        return TableConfigUtils.getCalciteConfig(tableConfig);
    }
    
    private static FlinkSqlConformance getSqlConformance(TableConfig tableConfig) {
        SqlDialect sqlDialect = tableConfig.getSqlDialect();
        switch (sqlDialect) {
            case HIVE:
                return FlinkSqlConformance.HIVE;
            case DEFAULT:
                return FlinkSqlConformance.DEFAULT;
            default:
                throw new TableException("Unsupported SQL dialect: " + sqlDialect);
        }
    }



SQL验证:
StreamTableEnvironment  static create()  I
StreamTableEnvironmentImp static create()  -> return new StreamTableEnvironmentImpl

org.apache.flink.table.delegation.Planner planner = tableEnvironment.getPlanner();
Parser parser = tableEnvironment.getPlanner().getParser();
List operation = parser.parse(executeSql);
planner.translate(Collections.singletonList((ModifyOperation) operation.get(0)));



实现思路:

当我们执行FlinkSQL的时候,会捕获解析报错信息,由此获取验证器。

缺点:返回错误行数与CodeMirror行数不一致。coderMirror自动联想补全(附动态表补全方法【JS对象动态设置】)



定位验证器过程

运行:


TableEnvironmentImpl.java:666


TableEnvironmentImpl.java:767


TableEnvironmentImpl.java:676


TableEnvironmentImpl.java:1329


由此可知最终方法为:

 planner.translate(modifyOperations);



获取对象

考虑到STE的环境,要精确获取到planner对象才行:TableEnvironmentImpl类中:

那么问题来了,传参进来的planner怎么获取呢?


返回到接口:TableEnvironment的创建:

StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings);

调用TableEnvironmentImpl的静态方法create()。


我们注意到返回值:

public static StreamTableEnvironment create

有:

        Planner planner =
                ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
                        .create(
                                plannerProperties,
                                executor,
                                tableConfig,
                                functionCatalog,
                                catalogManager);

过于复杂:


但是注意到:

public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl
        implements StreamTableEnvironment {
public interface TableEnvironmentInternal extends TableEnvironment {

而TableEnvironmentInternal中有:

   protected final Planner planner;

赋值如StreamTableEnvironmentImpl一样。

    @VisibleForTesting
    public Planner getPlanner() {
        return planner;
    }

如果没有呢? 那就只能复杂创建了,论好架构的作用性。



具体实现
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode()
                // .useOldPlanner() // flink
                .useBlinkPlanner() // blink
                .build();
        StreamTableEnvironmentImpl ste = (StreamTableEnvironmentImpl) StreamTableEnvironment.create(env, settings);


		
        org.apache.flink.table.delegation.Planner planner = ste.getPlanner();
        Parser parser = ste.getPlanner().getParser();
        try {
            List operation = parser.parse(executeSql.get(0));
            planner.translate(Collections.singletonList((ModifyOperation) operation.get(0)));
        } catch (Exception e) {
            // TODO: handle exception
            System.err.println(e.toString());
            errInfo = "出现语法错误:" + e.toString().split(": ")[1].split("Was")[0];
            errInfo = e.toString().split(": ").length > 2 ? errInfo + " : " + e.toString().split(": ")[2] : errInfo;
            throw new Exception(errInfo);
        }



效果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278457.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号