- pom
- code
- 运行效果图
解决了sql 长度超过64k限制
修复了其他bug pom
codeflink-sql_1.14 1.0 flink-sql_1.14 http://www.example.com UTF-8 1.7 1.7 1.14.0 2.11 2.1.1-cdh6.1.1 3.0.0-cdh6.1.1 cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ commons-cli commons-cli 1.4 org.apache.flink flink-table-api-java-bridge_2.11 1.14.0 provided org.apache.flink flink-connector-hive_2.11 1.14.0 provided org.apache.flink flink-table-api-java-bridge_2.11 1.14.0 provided org.apache.hive hive-exec ${hive.version} calcite-avatica org.apache.calcite calcite-core org.apache.calcite calcite-linq4j org.apache.calcite org.apache.hadoop hadoop-client ${hadoop.version} commons-math3 org.apache.commons junit junit 4.11 test net.alchim31.maven scala-maven-plugin 3.2.0 scala-compile-first process-resources add-source compile scala-test-compile process-test-resources testCompile ${scala.version} ${scala.version} org.apache.maven.plugins maven-compiler-plugin 3.2 1.8 1.8 UTF-8 org.apache.maven.plugins maven-assembly-plugin 3.1.0 jar-with-dependencies assemble-all package single
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment, TableResult}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.types.Row
import org.apache.flink.util.CloseableIterator
object TestSelect extends Logging{
def main(args: Array[String]) : Unit = {
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build()
val tEnv = TableEnvironment.create(settings)
val name = "hc"
val defaultDS = "gdc"
val hiveCD: String = "/dirpath"
val hiveCatalog = new HiveCatalog(name, defaultDS, hiveCD)
tEnv.registerCatalog("hc",hiveCatalog)
tEnv.useCatalog("hc")
tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
val sql =
"""
|select * from test.student
|""".stripMargin
val result: TableResult = tEnv.executeSql(sql)
result.print()
val value: CloseableIterator[Row] = result.collect()
while (value.hasNext) {
log.info("row: " + value.next().toString)
}
}
}
运行效果图



