代码如下所示:
public class KafkaToHive {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.12以后默认都是EventTime,这是过期方法,并且不在提IngestionTime
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//如果要使用ProcessingTime,可以关闭watermark
env.getConfig().setAutoWatermarkInterval(0);
env.setParallelism(2);
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
//设置为exactly-once
tEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
tEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20));
//配置hive
String catalogName = "my_catalog";
String db = "default";
String hiveConfPath = "./src/main/resources";
HiveCatalog hiveCatalog = new HiveCatalog(catalogName, db, hiveConfPath);
//注册并使用
tEnv.registerCatalog(catalogName,hiveCatalog);
tEnv.useCatalog(catalogName);
tEnv.executeSql("CREATE DATAbase IF NOT EXISTS stream");
tEnv.executeSql("DROp TABLE IF EXISTS stream.kafka_log");
//创建kafka的表
tEnv.executeSql("create table stream.kafka_log(n" +
"user_id String,n" +
"order_amount Double,n" +
"order_amount Double,n" +
"log_ts Timestamp(3),n" +
"WATERMARK FOR log_ts AS log_ts -INTERVAL '5' SECONDn" +
" )WITH(n" +
" 'connector' = 'kafka',n" +
" 'topic' = 'test',n" +
" 'properties.bootstrap.servers' = 'node1:9092',n" +
" 'properties.group.id' = 'flink1',n" +
" 'scan.startup.mode' = 'earliest-offset',n" +
" 'format' = 'json',n" +
" 'json.fail-on-missing-field' = 'falsen" +
" 'json.ignore-parse-errors' = 'true'n" +
" )");
//开始在hive中创建表
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.executeSql("CREATE DATAbase IF NOT EXISTS test");
tEnv.executeSql("DROP TABLE IF EXISTS test.hive_log");
tEnv.executeSql(" create table test.hive_log(n" +
" user_id String,n" +
" order_amount double,n" +
" )partition by (n" +
" dt STRING,n" +
" hr STRINGn" +
" )stored as PARQUETn" +
" tblproperties(n" +
" 'sink.partition-commit.trigger' = 'partition-timen" +
" 'sink.partition-commit.delay' = '1min',n" +
" 'format' = 'json',n" +
" 'sink.partition-commit.policy.kind' = 'metastore,success-file',n" +
" 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'n" +
" )");
//将kafka中的数据插入到hive中
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("insert into test.hive_log n" +
" select n" +
" user_id,n" +
" order_amount,n" +
" DATE_FORMAT(log_ts,'yyyy-MM-dd'),n" +
" DATE_FORMAT(log_ts,'HH')n" +
" from stream.kafka_log");
}
}
pom.xml文件如下所示:
org.apache.flink flink-connector-kafka_2.11 ${flink.version} org.apache.flink flink-streaming-java_${scala.compat.version} ${flink.version} org.apache.flink flink-statebackend-rocksdb_${scala.compat.version} ${flink.version} provided org.apache.flink flink-statebackend-rocksdb_${scala.compat.version} ${flink.version} org.apache.flink flink-table-planner_2.11 ${flink.version} org.apache.flink flink-streaming-scala_${scala.compat.version} ${flink.version} org.apache.flink flink-clients_${scala.compat.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-table-api-scala-bridge_${scala.compat.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.compat.version} ${flink.version} org.apache.flink flink-connector-hive_${scala.compat.version} ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hadoop hadoop-common ${hadoop.version} org.apache.hadoop hadoop-hdfs ${hadoop.version} org.apache.hadoop hadoop-yarn-client ${hadoop.version} org.apache.hadoop hadoop-mapreduce-client-core ${hadoop.version} org.apache.hive hive-exec ${hive.version} org.pentaho pentaho-aggdesigner-algorithm org.apache.hbase hbase-server ${hbase.version} org.apache.hbase hbase-common ${hbase.version} org.apache.commons commons-pool2 2.4.2 org.apache.maven.plugins maven-compiler-plugin 3.5.1 1.8 1.8 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies org.apache.maven.plugins maven-compiler-plugin 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single



