栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink1.13 读取kafka写入hive

flink1.13 读取kafka写入hive

代码如下所示:

public class KafkaToHive {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.12以后默认都是EventTime,这是过期方法,并且不在提IngestionTime
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //如果要使用ProcessingTime,可以关闭watermark
        env.getConfig().setAutoWatermarkInterval(0);

        env.setParallelism(2);

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                                                    .useBlinkPlanner()
                                                    .inStreamingMode()
                                                    .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        //设置为exactly-once
        tEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
        tEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20));

        //配置hive
        String catalogName = "my_catalog";
        String db = "default";
        String hiveConfPath = "./src/main/resources";
        HiveCatalog hiveCatalog = new HiveCatalog(catalogName, db, hiveConfPath);
        //注册并使用
        tEnv.registerCatalog(catalogName,hiveCatalog);
        tEnv.useCatalog(catalogName);

        tEnv.executeSql("CREATE DATAbase IF NOT EXISTS stream");
        tEnv.executeSql("DROp TABLE IF EXISTS stream.kafka_log");
        //创建kafka的表
        tEnv.executeSql("create table stream.kafka_log(n" +
                "user_id String,n" +
                "order_amount Double,n" +
                "order_amount Double,n" +
                "log_ts Timestamp(3),n" +
                "WATERMARK FOR log_ts AS log_ts -INTERVAL '5' SECONDn" +
                " )WITH(n" +
                " 'connector' = 'kafka',n" +
                " 'topic' = 'test',n" +
                " 'properties.bootstrap.servers' = 'node1:9092',n" +
                " 'properties.group.id' = 'flink1',n" +
                " 'scan.startup.mode' = 'earliest-offset',n" +
                " 'format' = 'json',n" +
                " 'json.fail-on-missing-field' = 'falsen" +
                " 'json.ignore-parse-errors' = 'true'n" +
                " )");
        //开始在hive中创建表
          tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
          tEnv.executeSql("CREATE DATAbase IF NOT EXISTS test");
          tEnv.executeSql("DROP TABLE IF EXISTS test.hive_log");
          tEnv.executeSql(" create table test.hive_log(n" +
                  " user_id String,n" +
                  " order_amount double,n" +
                  " )partition by (n" +
                  " dt STRING,n" +
                  " hr STRINGn" +
                  " )stored as PARQUETn" +
                  " tblproperties(n" +
                  " 'sink.partition-commit.trigger' = 'partition-timen" +
                  " 'sink.partition-commit.delay' = '1min',n" +
                  " 'format' = 'json',n" +
                  " 'sink.partition-commit.policy.kind' = 'metastore,success-file',n" +
                  " 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'n" +
                  " )");
          //将kafka中的数据插入到hive中
          tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
          tEnv.executeSql("insert into test.hive_log n" +
                  "  select n" +
                  "  user_id,n" +
                  "  order_amount,n" +
                  "  DATE_FORMAT(log_ts,'yyyy-MM-dd'),n" +
                  "  DATE_FORMAT(log_ts,'HH')n" +
                  "  from stream.kafka_log");
    }
}

pom.xml文件如下所示:

 
        
            org.apache.flink
            flink-connector-kafka_2.11
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_${scala.compat.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-statebackend-rocksdb_${scala.compat.version}
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-statebackend-rocksdb_${scala.compat.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-table-planner_2.11
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_${scala.compat.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-clients_${scala.compat.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-table-common
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-scala-bridge_${scala.compat.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner-blink_${scala.compat.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-hive_${scala.compat.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-json
            ${flink.version}
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
        
        
            org.apache.hadoop
            hadoop-hdfs
            ${hadoop.version}
        
        
            org.apache.hadoop
            hadoop-yarn-client
            ${hadoop.version}
        
        
            org.apache.hadoop
            hadoop-mapreduce-client-core
            ${hadoop.version}
        
        
            org.apache.hive
            hive-exec
            ${hive.version}
            
                
                    org.pentaho
                    pentaho-aggdesigner-algorithm
                
            
        
        
            org.apache.hbase
            hbase-server
            ${hbase.version}
        
        
            org.apache.hbase
            hbase-common
            ${hbase.version}
        
        
            org.apache.commons
            commons-pool2
            2.4.2
        

    
    
        
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.5.1
                
                    1.8
                    1.8
                
            
            
            
                net.alchim31.maven
                scala-maven-plugin
                3.2.2
                
                    
                        
                            compile
                            testCompile
                        
                        
                            
                                -dependencyfile
                                ${project.build.directory}/.scala_dependencies
                            
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    1.8
                    1.8
                
            
            
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/653647.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号