- 参考文档
- 环境
- pom
- flink lib 文件
- Demo
- Hive
时间定义
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/concepts/time_attributes/
flink 1.13.1 java 1.8 scala 2.11 hive 2.1.1-cdh6.1.1 hadoop 3.0.0-cdh6.1.1pom
flink lib 文件UTF-8 1.8 1.8 1.13.1 2.11 2.1.1-cdh6.1.1 3.0.0-cdh6.1.1 2.8.2 cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ commons-cli commons-cli 1.4 org.apache.flink flink-streaming-scala_${scala.version} ${flink.version} org.apache.flink flink-table-api-scala-bridge_2.11 ${flink.version} org.apache.flink flink-table-api-java-bridge_2.11 ${flink.version} org.apache.flink flink-clients_${scala.version} ${flink.version} org.apache.flink flink-table-planner-blink_2.11 ${flink.version} provided org.apache.flink flink-table-runtime-blink_${scala.version} ${flink.version} provided org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-sql-connector-kafka_${scala.version} ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.flink flink-connector-hive_2.11 ${flink.version} org.apache.hive hive-exec ${hive.version} calcite-avatica org.apache.calcite calcite-core org.apache.calcite calcite-linq4j org.apache.calcite org.apache.hadoop hadoop-client ${hadoop.version} provided org.apache.flink flink-parquet_2.11 ${flink.version} org.apache.flink flink-orc_2.11 ${flink.version} org.apache.flink flink-connector-kafka_${scala.version} ${flink.version} org.apache.flink flink-connector-base ${flink.version} org.apache.logging.log4j log4j-slf4j-impl ${log4j.version} runtime org.apache.logging.log4j log4j-api ${log4j.version} runtime org.apache.logging.log4j log4j-core ${log4j.version} runtime mysql mysql-connector-java 5.1.46 org.apache.flink flink-connector-jdbc_2.11 ${flink.version}
antlr-runtime-3.5.2.jar com.ibm.icu-4.4.2.jar commons-cli-1.4.jar flink-connector-hive_2.11-1.13.1.jar flink-connector-kafka_2.11-1.13.1.jar flink-core-1.13.0.jar flink-csv-1.13.1.jar flink-dist_2.11-1.13.1.jar flink-json-1.13.1.jar flink-shaded-hadoop-2-uber-2.7.5-7.0.jar flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar flink-shaded-zookeeper-3.4.14.jar flink-sql-connector-hive-2.2.0_2.11-1.13.3.jar flink-table-api-java-1.13.1.jar flink-table-api-java-bridge_2.11-1.13.1.jar.0 flink-table-api-scala_2.11-1.13.1.jar.0 flink-table-planner_2.11-1.13.1.jar flink-table-planner-blink_2.11-1.13.1.jar flink-table-runtime-blink_2.11-1.13.1.jar jindo-flink-sink-3.6.1.jar jindofs-sdk-3.6.1.jar log4j-1.2-api-2.12.1.jar log4j-api-2.12.1.jar log4j-core-2.12.1.jar log4j-slf4j-impl-2.12.1.jarDemo
package com.gwm.core.kafka
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import com.gwm.utils.log.Logging
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog
object TestFlinkKafka extends Logging{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//checkpoint
env.enableCheckpointing(1000 * 60,CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointStorage("hdfs://test/checkpoint/")
// val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(5 * 60 * 1000)
// val checkpointConfig = env.getCheckpointConfigcheckpointConfig.setMinPauseBetweenCheckpoints(2 * 60 * 1000)
// checkpointConfig.setCheckpointTimeout(3 * 60 * 1000)checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().build()
//stream table env
val tableEnv = StreamTableEnvironment.create(env, settings)
//hive cataLog
val hiveCatalogName: String = "kafka_hc"
val defaultDatabasesName: String = "test"
val hiveConfDir: String = "/alidata1/tmp/hivecf/"
val cataLog: HiveCatalog = new HiveCatalog(hiveCatalogName, defaultDatabasesName, hiveConfDir)
tableEnv.registerCatalog("kafka_hc", cataLog)
tableEnv.useCatalog(hiveCatalogName)
//获取kafka 流
tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
tableEnv.executeSql("drop table if exists KafkaSourceTable")
//ts as to_timestamp(from_unixtime(tid/1000, 'yy-MM-dd HH:mm:ss'))
val createTableSql: String =
"""
|create table KafkaSourceTable(
| vin String,
| tid String,
| source String,
| datas String,
| ts as substr(concat('20',tid),1,8 )
| ) WITH (
| 'connector' = 'kafka',
| 'topic' = 'topic_c',
| 'properties.bootstrap.servers' = 'ip:9092,ip2:9092,ip3:9092',
| 'properties.group.id' = 'flink-test-group',
| 'format' = 'json',
| 'scan.startup.mode' = 'earliest-offset',
| 'json.ignore-parse-errors' = 'true',
| 'scan.startup.mode' = 'group-offsets'
|)
""".stripMargin
log.info(s"createTableSql: $createTableSql")
tableEnv.executeSql(createTableSql)
// //hvie cataLog
tableEnv.useCatalog("kafka_hc")
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useDatabase("test")
// val dropSql = ""
val hiveSql: String =
"""
|create table if not exists HiveSinkTable(
| vin String,
| tid String,
| source String,
| datas String )
| partitioned by (`date` String)
| row format delimited fields terminated by 't'
|stored as orc
|tblproperties (
| 'orc.compress'='SNAPPY',
| 'partition.time-extractor.timestamp-pattern' = '$date 00:00:00',
| 'sink.partition-commit.trigger' = 'process-time',
| 'sink.partition-commit.policy.kind' = 'metastore,success-file'
|)
|""".stripMargin
tableEnv.executeSql(hiveSql)
//切换方言
tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
val insertSql: String =
"""
|insert into HiveSinkTable
|select vin, tid, source, datas, from_unixtime(unix_timestamp(ts,'yyyyMMdd'),'yyyy-MM-dd') as `date`
|from KafkaSourceTable
|""".stripMargin
//select vin, tid, sources, datas, date_format(ts, 'yyyy-MM-dd') as `date`
log.info(s"insertSql: $insertSql")
tableEnv.executeSql(insertSql)
// env.execute("kafka to hive")
}
}
Hive
tid 上传时间不标准,所以分区不准确。



