本文编写2021.2.25
从官网的资料来看,DDL数据源到运行流程如下:
图片来源官网
那么关于建立动态表后,执行的FlinkSQL语句的语法是否正确,我们应当在Planning中进行。
生产消费者
FlinkSQL使用DDL创建Kafka生产和消费者
Git项目的SQL语句语法校验
缺点:无上下文环境,单纯的语法校验。
https://github.com/zhp8341/flink-streaming-platform-web
关于该部分代码:https://www.jianshu.com/p/8b878ce64da9
CalciteParser parser = new CalciteParser(getSqlParserConfig(tEnv.getConfig()));
try {
parser.parse(sql);
} catch (Exception e) {
System.out.println("出现语法错误:" + e.toString().split(": ")[1].split("Was")[0]); //编者改
}
private static SqlParser.Config getSqlParserConfig(TableConfig tableConfig) {
return JavaScalaConversionUtil.toJava(getCalciteConfig(tableConfig).getSqlParserConfig()).orElseGet(
() -> {
SqlConformance conformance = getSqlConformance(tableConfig);
return SqlParser
.config()
.withParserFactory(FlinkSqlParserFactories.create(conformance))
.withConformance(conformance)
.withLex(Lex.JAVA)
.withIdentifierMaxLength(256);
}
);
}
private static CalciteConfig getCalciteConfig(TableConfig tableConfig) {
return TableConfigUtils.getCalciteConfig(tableConfig);
}
private static FlinkSqlConformance getSqlConformance(TableConfig tableConfig) {
SqlDialect sqlDialect = tableConfig.getSqlDialect();
switch (sqlDialect) {
case HIVE:
return FlinkSqlConformance.HIVE;
case DEFAULT:
return FlinkSqlConformance.DEFAULT;
default:
throw new TableException("Unsupported SQL dialect: " + sqlDialect);
}
}
StreamTableEnvironment static create() I StreamTableEnvironmentImp static create() -> return new StreamTableEnvironmentImpl org.apache.flink.table.delegation.Planner planner = tableEnvironment.getPlanner(); Parser parser = tableEnvironment.getPlanner().getParser(); Listoperation = parser.parse(executeSql); planner.translate(Collections.singletonList((ModifyOperation) operation.get(0)));
当我们执行FlinkSQL的时候,会捕获解析报错信息,由此获取验证器。
缺点:返回错误行数与CodeMirror行数不一致。coderMirror自动联想补全(附动态表补全方法【JS对象动态设置】)
运行:
TableEnvironmentImpl.java:666
TableEnvironmentImpl.java:767
TableEnvironmentImpl.java:676
TableEnvironmentImpl.java:1329
由此可知最终方法为:
planner.translate(modifyOperations);
考虑到STE的环境,要精确获取到planner对象才行:TableEnvironmentImpl类中:
那么问题来了,传参进来的planner怎么获取呢?
返回到接口:TableEnvironment的创建:
StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings);
调用TableEnvironmentImpl的静态方法create()。
我们注意到返回值:
public static StreamTableEnvironment create
有:
Planner planner =
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(
plannerProperties,
executor,
tableConfig,
functionCatalog,
catalogManager);
过于复杂:
但是注意到:
public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl
implements StreamTableEnvironment {
public interface TableEnvironmentInternal extends TableEnvironment {
而TableEnvironmentInternal中有:
protected final Planner planner;
赋值如StreamTableEnvironmentImpl一样。
@VisibleForTesting
public Planner getPlanner() {
return planner;
}
如果没有呢? 那就只能复杂创建了,论好架构的作用性。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode()
// .useOldPlanner() // flink
.useBlinkPlanner() // blink
.build();
StreamTableEnvironmentImpl ste = (StreamTableEnvironmentImpl) StreamTableEnvironment.create(env, settings);
org.apache.flink.table.delegation.Planner planner = ste.getPlanner();
Parser parser = ste.getPlanner().getParser();
try {
List operation = parser.parse(executeSql.get(0));
planner.translate(Collections.singletonList((ModifyOperation) operation.get(0)));
} catch (Exception e) {
// TODO: handle exception
System.err.println(e.toString());
errInfo = "出现语法错误:" + e.toString().split(": ")[1].split("Was")[0];
errInfo = e.toString().split(": ").length > 2 ? errInfo + " : " + e.toString().split(": ")[2] : errInfo;
throw new Exception(errInfo);
}



