最近在搞flink,搞了一个当前比较新的版本试了一下,当时运行了很长时间,hdfs里面查询有文件,但是hive里面查询这个表为空,后面用了很多种方式,一些是说自己去刷新hive表,如下:
第一种方式刷新 alter table t_kafkamsg2hivetable add partition(dt='2022-03-04',hr=11); 第二种方式刷新,也可以说是修复 msck repair table t_kafkamsg2hivetable;
看了很多博客,最后还是在官网找到答案,先贴代码:
按照flink官方给的案例生成的项目,地址:
Fraud Detection with the DataStream API | Apache Flink
$ mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-walkthrough-datastream-java
-DarchetypeVersion=1.14.3
-DgroupId=frauddetection
-DartifactId=frauddetection
-Dversion=0.1
-Dpackage=spendreport
-DinteractiveMode=false
pom文件导入内容(有点杂,可以选引入)
4.0.0 frauddetection frauddetection0.1 jar Flink Walkthrough DataStream Java https://flink.apache.org UTF-8 1.14.3 1.8 2.11 ${target.java.version} ${target.java.version} 2.17.1 2.1.1 2.6.0 org.apache.flink flink-walkthrough-common_${scala.binary.version}${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version}${flink.version} provided org.apache.flink flink-clients_${scala.binary.version}${flink.version} provided org.apache.logging.log4j log4j-slf4j-impl${log4j.version} runtime org.apache.logging.log4j log4j-api${log4j.version} runtime org.apache.logging.log4j log4j-core${log4j.version} runtime junit junit4.12 compile org.apache.flink flink-test-utils_2.111.14.3 test org.apache.flink flink-runtime1.14.3 org.projectlombok lombok1.18.20 org.apache.flink flink-connector-kafka_2.111.14.3 org.apache.flink flink-connector-jdbc_2.111.14.3 mysql mysql-connector-java8.0.16 cn.hutool hutool-all5.5.8 org.apache.flink flink-table-api-java-bridge_2.111.14.3 provided org.apache.flink flink-table-planner_2.111.14.3 provided org.apache.flink flink-table-common1.14.3 provided org.apache.flink flink-connector-hive_${scala.binary.version}${flink.version} org.apache.flink flink-json${flink.version} org.apache.hive hive-exec${hive.version} org.apache.flink flink-hadoop-compatibility_2.11${flink.version} org.apache.hadoop hadoop-common${hadoop.version} org.apache.hadoop hadoop-client${hadoop.version} org.apache.maven.plugins maven-compiler-plugin3.1 ${target.java.version} ${target.java.version} org.apache.maven.plugins maven-shade-plugin3.0.0 package shade org.apache.flink:flink-shaded-force-shading com.google.code.findbugs:jsr305 org.slf4j:* org.apache.logging.log4j:* *:* meta-INF @Override public void run(SourceContext ctx) throws Exception { while(isRunning){ // ip信息描述 int i = new Random().nextInt(5); JSonObject jsonObject = JSONUtil.createObj() .set("ip", ipList.get(i)) .set("msg", ipList.get(i) + "接进来,请接听对应信息") .set("ts", System.currentTimeMillis()); String book = jsonObject.toString(); ctx.collect(book); System.out.println("-----------------"+book); //每5秒产生一条数据 Thread.sleep(5000); } } //取消一个cancel的时候会调用的方法 @Override public void cancel() { isRunning = false; } }
package spendreport.kafka.test;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.time.Instant;
import java.util.Properties;
import java.util.Random;
import java.util.TimeZone;
public class KafkaProductMain {
public static final Random random = new Random();
public static void main(String[] args) throws Exception {
// kafkaproduct-0.1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStreamSource text = env.addSource(new MyNoParalleSource()).setParallelism(1);
DataStreamSource text = env.addSource(new MyJsonParalleSource()).setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxx:9092");
//new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
// FlinkKafkaProducer producer = new FlinkKafkaProducer("test_flink",new SimpleStringSchema(),properties);
FlinkKafkaProducer producer = new FlinkKafkaProducer("test_flink",new SimpleStringSchema(),properties);
text.addSink(producer);
env.execute();
}
}
2、flink table 结合flink sql方式导数据到hive
package spendreport.hive;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import java.time.Duration;
public class SinkHiveTest {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME", "hive");
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
// 第一种方式创建----------start------------------
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// String jarFile = "D:\ljpPro\frauddetection-0.1.jar";
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.37.103", 8081, jarFile);
// 使用StreamExecutionEnvironment创建StreamTableEnvironment,必须设置StreamExecutionEnvironment的checkpoint
env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.hive.fallback-mapred-reader", "true");
//如果 topic 中的某些分区闲置,watermark 生成器将不会向前推进。 你可以在表配置中设置 'table.exec.source.idle-timeout' 选项来避免上述问题
configuration.setString("table.exec.source.idle-timeout", "10s");
// 第一种方式创建------------------end----------------------
// 第二种方式创建----------start------------------
// TableEnvironment tableEnv = TableEnvironment.create(settings);
// Configuration configuration = tableEnv.getConfig().getConfiguration();
// configuration.setString("table.exec.hive.fallback-mapred-reader", "true");
// configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
// configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(2));
// 第二种方式创建------------------end----------------------
// 1.创建HiveCatalog
String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/etc/hive/conf/";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
// 2.注册HiveCatalog
tableEnv.registerCatalog(name, hive);
// 3.把HiveCatalog: myhive作为当前session的catalog
tableEnv.useCatalog(name);
tableEnv.useDatabase(defaultDatabase);
//指定方言
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 5. 建表sql以hive为目的地
tableEnv.executeSql("drop table if exists t_kafkaMsg2hiveTable");
tableEnv.executeSql("CREATE TABLE IF NOT EXISTS t_kafkaMsg2hiveTable ("
+ "ip STRING,"
+ "msg STRING"
+ ")"
+ " PARTITIonED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ("
+ " 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'," // hive 分区提取器提取时间戳的格式
+ " 'sink.partition-commit.trigger'='partition-time'," // 分区触发提交的类型可以指定 "process-time" 和 "partition-time" 处理时间和分区时间
+ " 'sink.partition-commit.delay'='5s'," // 提交延迟
// + " 'table.exec.source.idle-timeout'='10s'," // 如果 topic 中的某些分区闲置,watermark 生成器将不会向前推进。 你可以在表配置中设置 'table.exec.source.idle-timeout' 选项来避免上述问题
+ " 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', " //-- Assume user configured time zone is 'Asia/Shanghai'
+ " 'sink.partition-commit.policy.kind'='metastore,success-file'" // 提交类型
+ ")");
//指定方言
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// 4.建表sql以kafka为数据源,创建后就会监听kafka并写入数据至flink的内存
tableEnv.executeSql("drop table if exists t_KafkaMsgSourceTable");
tableEnv.executeSql("CREATE TABLE IF NOT EXISTS t_KafkaMsgSourceTable ("
+ "ip STRING"
+ ",msg STRING"
+ ",ts BIGINT" //13位原始时间戳
// + ",ts3 AS TO_TIMESTAMP(FROM_UNIXTIME(ts/ 1000, 'yyyy-MM-dd HH:mm:ss'))" //flink的TIMESTAMP(3)格式
+ ",ts3 AS TO_TIMESTAMP_LTZ(ts, 3)" //flink的TIMESTAMP(3)格式
+ ",WATERMARK FOR ts3 AS ts3 - INTERVAL '5' SECOND" //水印最迟5s
+ ")"
+ " WITH ("
+ " 'connector' = 'kafka',"
+ " 'topic' = 'test_flink',"
+ " 'properties.bootstrap.servers' = 'xxx:9092',"
+ " 'properties.group.id' = 'kafkaflinkhivedemo',"
+ " 'scan.startup.mode' = 'earliest-offset'," //earliest-offset
+ " 'format' = 'json',"
+ " 'json.ignore-parse-errors' = 'true'"
+ ")");
// 6. 同步2个表
tableEnv.executeSql("INSERT INTO t_kafkaMsg2hiveTable "
+ "SELECt ip,msg,DATE_FORMAT(ts3, 'yyyy-MM-dd'), DATE_FORMAT(ts3, 'HH') FROM t_KafkaMsgSourceTable").print();
// tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// tableEnv.executeSql("alter table t_kafkamsg2hivetable add partition(dt='2022-03-04',hr=11)");
// tableEnv.executeSql("SELECt * FROM t_kafkamsg2hivetable WHERe dt='2022-03-04' and hr='11'").print();
// tableEnv.executeSql("select * from t_KafkaMsgSourceTable").print();
}
}
3、resources文件引入
要引入下面几个文件,需要自己去hadoop文件下面去找:
core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml
然后在/etc/hive/conf/目录下要有一个hive-site文件,最后执行的时候,在ideal的run配置中还需选中这个
理论上这样执行就可以了,对应刚刚开始说的一直查询不到,我的理解是kafka中topic分区和flink这边的并行分区对应不上,然后flink一直在导数据,但是没有刷新hive元数据,加了如下配置:
configuration.setString("table.exec.source.idle-timeout", "10s");
多少秒自己定,我是在flink地址中找到:
当然,前人栽树后人乘凉,参考了很多博客大佬的经验,一些如下:
【flink】【kafka】【hive】flink消费kafka数据存到hive - xiaostudy - 博客园
FlinkSQL Kafka to Hive_L, there!-CSDN博客



