栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink 1.4版本flink table方式消费kafka写入hive方式踩坑

flink 1.4版本flink table方式消费kafka写入hive方式踩坑

最近在搞flink,搞了一个当前比较新的版本试了一下,当时运行了很长时间,hdfs里面查询有文件,但是hive里面查询这个表为空,后面用了很多种方式,一些是说自己去刷新hive表,如下:

第一种方式刷新
alter table t_kafkamsg2hivetable add partition(dt='2022-03-04',hr=11);

第二种方式刷新,也可以说是修复
msck repair table t_kafkamsg2hivetable;

看了很多博客,最后还是在官网找到答案,先贴代码:

按照flink官方给的案例生成的项目,地址:

Fraud Detection with the DataStream API | Apache Flink

$ mvn archetype:generate 
    -DarchetypeGroupId=org.apache.flink 
    -DarchetypeArtifactId=flink-walkthrough-datastream-java 
    -DarchetypeVersion=1.14.3 
    -DgroupId=frauddetection 
    -DartifactId=frauddetection 
    -Dversion=0.1 
    -Dpackage=spendreport 
    -DinteractiveMode=false
pom文件导入内容(有点杂,可以选引入)

	4.0.0

	frauddetection
	frauddetection
	0.1
	jar

	Flink Walkthrough DataStream Java
	https://flink.apache.org

	
		UTF-8
		1.14.3
		1.8
		2.11
		${target.java.version}
		${target.java.version}
		2.17.1
		2.1.1
		2.6.0
	















	
		
			org.apache.flink
			flink-walkthrough-common_${scala.binary.version}
			${flink.version}
		

		
		
			org.apache.flink
			flink-streaming-java_${scala.binary.version}
			${flink.version}
			provided
		
		
			org.apache.flink
			flink-clients_${scala.binary.version}
			${flink.version}
			provided
		

		

		

		
		
		
			org.apache.logging.log4j
			log4j-slf4j-impl
			${log4j.version}
			runtime
		
		
			org.apache.logging.log4j
			log4j-api
			${log4j.version}
			runtime
		
		
			org.apache.logging.log4j
			log4j-core
			${log4j.version}
			runtime
		
        
            junit
            junit
            4.12
            compile
        
		
			org.apache.flink
			flink-test-utils_2.11
			1.14.3
			test
		
		
			org.apache.flink
			flink-runtime
			1.14.3

		









		
			org.projectlombok
			lombok
			1.18.20
		

		
			org.apache.flink
			flink-connector-kafka_2.11
			1.14.3
		
		
			org.apache.flink
			flink-connector-jdbc_2.11
			1.14.3
		
		
			mysql
			mysql-connector-java
			8.0.16
		
		
			cn.hutool
			hutool-all
			5.5.8
		
		
			org.apache.flink
			flink-table-api-java-bridge_2.11
			1.14.3
			provided
		
		
			org.apache.flink
			flink-table-planner_2.11
			1.14.3
			provided
		







		
			org.apache.flink
			flink-table-common
			1.14.3
			provided
		






		
		
			org.apache.flink
			flink-connector-hive_${scala.binary.version}
			${flink.version}

		

		
			org.apache.flink
			flink-json
			${flink.version}
		

		
			org.apache.hive
			hive-exec
			${hive.version}
		

		
			org.apache.flink
			flink-hadoop-compatibility_2.11
			${flink.version}
		

		
			org.apache.hadoop
			hadoop-common
			${hadoop.version}
		

		
			org.apache.hadoop
			hadoop-client
			${hadoop.version}
		




		










	


	
		

			
			
				org.apache.maven.plugins
				maven-compiler-plugin
				3.1
				
					${target.java.version}
					${target.java.version}
				
			

			
			
			
				org.apache.maven.plugins
				maven-shade-plugin
				3.0.0
				
					
					
						package
						
							shade
						
						
							
								
									org.apache.flink:flink-shaded-force-shading
									com.google.code.findbugs:jsr305
									org.slf4j:*
									org.apache.logging.log4j:*
								
							
							
								
									
									*:*
									
										meta-INF
    @Override
    public void run(SourceContext ctx) throws Exception {
        while(isRunning){
            // ip信息描述
            int i = new Random().nextInt(5);
            JSonObject jsonObject = JSONUtil.createObj()
                    .set("ip", ipList.get(i))
                    .set("msg", ipList.get(i) + "接进来,请接听对应信息")
                    .set("ts", System.currentTimeMillis());
            String book = jsonObject.toString();
            ctx.collect(book);
            System.out.println("-----------------"+book);
            //每5秒产生一条数据
            Thread.sleep(5000);
        }
    }
    //取消一个cancel的时候会调用的方法
    @Override
    public void cancel() {
        isRunning = false;
    }
}
package spendreport.kafka.test;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.time.Instant;
import java.util.Properties;
import java.util.Random;
import java.util.TimeZone;


public class KafkaProductMain {
    public static final Random random = new Random();
    public static void main(String[] args) throws Exception {
        // kafkaproduct-0.1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//        DataStreamSource text = env.addSource(new MyNoParalleSource()).setParallelism(1);
        DataStreamSource text = env.addSource(new MyJsonParalleSource()).setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "xxx:9092");
        //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
//        FlinkKafkaProducer producer = new FlinkKafkaProducer("test_flink",new SimpleStringSchema(),properties);
        FlinkKafkaProducer producer = new FlinkKafkaProducer("test_flink",new SimpleStringSchema(),properties);

        text.addSink(producer);
        env.execute();
    }
}
2、flink table 结合flink sql方式导数据到hive
package spendreport.hive;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

import java.time.Duration;

public class SinkHiveTest {
    public static void main(String[] args) {

        System.setProperty("HADOOP_USER_NAME", "hive");
        EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();

        // 第一种方式创建----------start------------------
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        String jarFile = "D:\ljpPro\frauddetection-0.1.jar";
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.37.103", 8081, jarFile);
//        使用StreamExecutionEnvironment创建StreamTableEnvironment,必须设置StreamExecutionEnvironment的checkpoint
        env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        configuration.setString("table.exec.hive.fallback-mapred-reader", "true");
        //如果 topic 中的某些分区闲置,watermark 生成器将不会向前推进。 你可以在表配置中设置 'table.exec.source.idle-timeout' 选项来避免上述问题
        configuration.setString("table.exec.source.idle-timeout", "10s");
        // 第一种方式创建------------------end----------------------

        // 第二种方式创建----------start------------------
//        TableEnvironment tableEnv = TableEnvironment.create(settings);
//        Configuration configuration = tableEnv.getConfig().getConfiguration();
//        configuration.setString("table.exec.hive.fallback-mapred-reader", "true");
//        configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
//        configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(2));
        // 第二种方式创建------------------end----------------------


// 1.创建HiveCatalog
        String name = "myhive";
        String defaultDatabase = "mydatabase";
        String hiveConfDir = "/etc/hive/conf/";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        // 2.注册HiveCatalog
        tableEnv.registerCatalog(name, hive);
        // 3.把HiveCatalog: myhive作为当前session的catalog
        tableEnv.useCatalog(name);
        tableEnv.useDatabase(defaultDatabase);
//指定方言
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 5. 建表sql以hive为目的地

        tableEnv.executeSql("drop table if exists t_kafkaMsg2hiveTable");

        tableEnv.executeSql("CREATE TABLE IF NOT EXISTS t_kafkaMsg2hiveTable ("

                + "ip STRING,"

                + "msg STRING"

                + ")"

                + " PARTITIonED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ("

                + " 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'," // hive 分区提取器提取时间戳的格式

                + " 'sink.partition-commit.trigger'='partition-time'," // 分区触发提交的类型可以指定 "process-time" 和 "partition-time" 处理时间和分区时间

                + " 'sink.partition-commit.delay'='5s'," // 提交延迟
//                + " 'table.exec.source.idle-timeout'='10s'," //  如果 topic 中的某些分区闲置,watermark 生成器将不会向前推进。 你可以在表配置中设置 'table.exec.source.idle-timeout' 选项来避免上述问题
                + " 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', " //-- Assume user configured time zone is 'Asia/Shanghai'
                + " 'sink.partition-commit.policy.kind'='metastore,success-file'" // 提交类型

                + ")");


//指定方言
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);


        // 4.建表sql以kafka为数据源,创建后就会监听kafka并写入数据至flink的内存

        tableEnv.executeSql("drop table if exists t_KafkaMsgSourceTable");

        tableEnv.executeSql("CREATE TABLE IF NOT EXISTS t_KafkaMsgSourceTable ("
                + "ip STRING"
                + ",msg STRING"
                + ",ts BIGINT" //13位原始时间戳
//                + ",ts3 AS TO_TIMESTAMP(FROM_UNIXTIME(ts/ 1000, 'yyyy-MM-dd HH:mm:ss'))" //flink的TIMESTAMP(3)格式
                + ",ts3 AS TO_TIMESTAMP_LTZ(ts, 3)" //flink的TIMESTAMP(3)格式
                + ",WATERMARK FOR ts3 AS ts3 - INTERVAL '5' SECOND" //水印最迟5s
                + ")"
                + " WITH ("
                + " 'connector' = 'kafka',"
                + " 'topic' = 'test_flink',"
                + " 'properties.bootstrap.servers' = 'xxx:9092',"
                + " 'properties.group.id' = 'kafkaflinkhivedemo',"
                + " 'scan.startup.mode' = 'earliest-offset'," //earliest-offset
                + " 'format' = 'json',"
                + " 'json.ignore-parse-errors' = 'true'"
                + ")");


        // 6. 同步2个表
        tableEnv.executeSql("INSERT INTO t_kafkaMsg2hiveTable "
                + "SELECt ip,msg,DATE_FORMAT(ts3, 'yyyy-MM-dd'), DATE_FORMAT(ts3, 'HH') FROM t_KafkaMsgSourceTable").print();

//        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
//        tableEnv.executeSql("alter table t_kafkamsg2hivetable add partition(dt='2022-03-04',hr=11)");
//        tableEnv.executeSql("SELECt * FROM t_kafkamsg2hivetable WHERe dt='2022-03-04' and hr='11'").print();
//        tableEnv.executeSql("select * from t_KafkaMsgSourceTable").print();

    }
}
3、resources文件引入

要引入下面几个文件,需要自己去hadoop文件下面去找:

core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml

然后在/etc/hive/conf/目录下要有一个hive-site文件,最后执行的时候,在ideal的run配置中还需选中这个

理论上这样执行就可以了,对应刚刚开始说的一直查询不到,我的理解是kafka中topic分区和flink这边的并行分区对应不上,然后flink一直在导数据,但是没有刷新hive元数据,加了如下配置:

configuration.setString("table.exec.source.idle-timeout", "10s");

 多少秒自己定,我是在flink地址中找到:

 当然,前人栽树后人乘凉,参考了很多博客大佬的经验,一些如下:

【flink】【kafka】【hive】flink消费kafka数据存到hive - xiaostudy - 博客园

FlinkSQL Kafka to Hive_L, there!-CSDN博客

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758658.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号