栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink yarn-per-job模式 后台执行 定义 yarn名称 定义 flinkjob名称 jdbcSink动态传参 消费aws kinesis配置aliyun scala编译

flink yarn-per-job模式 后台执行 定义 yarn名称 定义 flinkjob名称 jdbcSink动态传参 消费aws kinesis配置aliyun scala编译

flink run -d -m yarn-cluster -ynm test2 -c xx.xx /home/baiyun/xxxx/target/xxxx.jar --xxx xx --xx xxx --xx xxx --tableName xx --jobName test2
注意scala版本与集群flink匹配,hadoop版本不要带到jar包中,以免与集群冲突,另外的依赖需要放到flink lib目录下面


    
        1.8
        2.11
        2.0.0
        1.2.0
        1.10.1
        2.7.1
    
    
        
            maven-ali
            http://maven.aliyun.com/nexus/content/groups/public//
            
                true
            
            
                true
                always
                fail
            
        


    


    
        
            
                com.amazonaws
                aws-java-sdk-bom
                
                1.11.903
                pom
                import
            
        
    

    

        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            provided
        
        
            org.apache.hadoop
            hadoop-hdfs
            ${hadoop.version}
            provided
            
                
                    xml-apis
                    xml-apis
                
            
        

        
            mysql
            mysql-connector-java
            5.1.44
        

        
            software.amazon.awssdk
            kinesis
            2.0.0
        

        
            com.amazonaws
            aws-kinesisanalytics-runtime
            ${kda.runtime.version}
        

        
            org.apache.flink
            flink-connector-kinesis_${scala.binary.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
                        provided
        


        
            org.apache.flink
            flink-java
            ${flink.version}
                        provided
        

        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            ${flink.version}
                        provided
        

        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
                        provided
        

        
            com.amazonaws
            aws-kinesisanalytics-flink
            ${kda.version}
        


        
            org.json4s
            json4s-native_2.10
            3.2.11

        
        
            com.alibaba
            fastjson
            1.2.62
        
        
            org.apache.bahir
            flink-connector-redis_2.11
            1.0
                        provided
        

    

    
        

            
                org.scala-tools
                maven-scala-plugin
                2.15.2
                
                    
                        
                            compile
                            testCompile
                        
                    
                
            



            
                org.apache.maven.plugins
                maven-assembly-plugin
                3.3.0
                
                    
                        
                            jar-with-dependencies
                        
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-jar-plugin
                2.4
                
                    
                        
                            true
                            lib/
                            com.example.MainClass
                        
                    
                
            

        
    

  class MyJdbcSink(tableName:String) extends RichSinkFunction[(String, String, String, Long, String, Long)] {
    // 定义一些变量:JDBC连接、sql预编译器()
    var conn: Connection = _
    var updateStmt: PreparedStatement = _
    var insertStmt: PreparedStatement = _

    // open函数用于初始化富函数运行时的上下文等环境,如JDBC连接
    override def open(parameters: Configuration): Unit = {
      println("----------------------------open函数初始化JDBC连接及预编译sql-------------------------")
      super.open(parameters)
      conn = DriverManager.getConnection(URL, USER, PASSWORD)
      insertStmt = conn.prepareStatement(s"INSERT INTO xx.$tableName (prt_dt, project, uv, pv,update_date) VALUES (?, ?, ?, ?,?)")
      updateStmt = conn.prepareStatement(s"UPDATE xx.$tableName set uv = ?, pv = ? ,update_date=? where prt_dt = ? and project = ?")
    }
    // 调JDBC连接,执行SQL

    // 关闭时做清理工作
    override def close(): Unit = {
      println("-----------------------关闭连接,并释放资源-----------------------")
      updateStmt.close()
      insertStmt.close()
      conn.close()
    }

    override def invoke(in: (String, String, String, Long, String, Long)): Unit = {
      val update_date = fmS.format(System.currentTimeMillis())

      val value = DauSs(in._1, in._2, in._4.toInt, in._6.toInt,update_date)

      println("-------------------------执行sql---------------------------")
      // 执行更新语句
      updateStmt.setInt(1, value.uv)
      updateStmt.setInt(2, value.pv)
      updateStmt.setString(3, value.update_date)
      updateStmt.setString(4, value.prt_dt)
      updateStmt.setString(5, value.project)
      updateStmt.execute()
      // 如果update没有查到数据,那么执行insert语句
      if (updateStmt.getUpdateCount == 0) {
        insertStmt.setString(1, value.prt_dt)
        insertStmt.setString(2, value.project)
        insertStmt.setInt(3, value.uv)
        insertStmt.setInt(4, value.pv)
        insertStmt.setString(5, value.update_date)
        insertStmt.execute()
      }
    }
  }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/604079.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号