flink run -d -m yarn-cluster -ynm test2 -c xx.xx /home/baiyun/xxxx/target/xxxx.jar --xxx xx --xx xxx --xx xxx --tableName xx --jobName test2
注意scala版本与集群flink匹配,hadoop版本不要带到jar包中,以免与集群冲突,另外的依赖需要放到flink lib目录下面
1.8 2.11 2.0.0 1.2.0 1.10.1 2.7.1 maven-ali http://maven.aliyun.com/nexus/content/groups/public// true true always fail com.amazonaws aws-java-sdk-bom 1.11.903 pom import org.apache.hadoop hadoop-common ${hadoop.version} provided org.apache.hadoop hadoop-hdfs ${hadoop.version} provided xml-apis xml-apis mysql mysql-connector-java 5.1.44 software.amazon.awssdk kinesis 2.0.0 com.amazonaws aws-kinesisanalytics-runtime ${kda.runtime.version} org.apache.flink flink-connector-kinesis_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} provided org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} provided org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} provided com.amazonaws aws-kinesisanalytics-flink ${kda.version} org.json4s json4s-native_2.10 3.2.11 com.alibaba fastjson 1.2.62 org.apache.bahir flink-connector-redis_2.11 1.0 provided org.scala-tools maven-scala-plugin 2.15.2 compile testCompile org.apache.maven.plugins maven-assembly-plugin 3.3.0 jar-with-dependencies make-assembly package single org.apache.maven.plugins maven-jar-plugin 2.4 true lib/ com.example.MainClass
class MyJdbcSink(tableName:String) extends RichSinkFunction[(String, String, String, Long, String, Long)] {
// 定义一些变量:JDBC连接、sql预编译器()
var conn: Connection = _
var updateStmt: PreparedStatement = _
var insertStmt: PreparedStatement = _
// open函数用于初始化富函数运行时的上下文等环境,如JDBC连接
override def open(parameters: Configuration): Unit = {
println("----------------------------open函数初始化JDBC连接及预编译sql-------------------------")
super.open(parameters)
conn = DriverManager.getConnection(URL, USER, PASSWORD)
insertStmt = conn.prepareStatement(s"INSERT INTO xx.$tableName (prt_dt, project, uv, pv,update_date) VALUES (?, ?, ?, ?,?)")
updateStmt = conn.prepareStatement(s"UPDATE xx.$tableName set uv = ?, pv = ? ,update_date=? where prt_dt = ? and project = ?")
}
// 调JDBC连接,执行SQL
// 关闭时做清理工作
override def close(): Unit = {
println("-----------------------关闭连接,并释放资源-----------------------")
updateStmt.close()
insertStmt.close()
conn.close()
}
override def invoke(in: (String, String, String, Long, String, Long)): Unit = {
val update_date = fmS.format(System.currentTimeMillis())
val value = DauSs(in._1, in._2, in._4.toInt, in._6.toInt,update_date)
println("-------------------------执行sql---------------------------")
// 执行更新语句
updateStmt.setInt(1, value.uv)
updateStmt.setInt(2, value.pv)
updateStmt.setString(3, value.update_date)
updateStmt.setString(4, value.prt_dt)
updateStmt.setString(5, value.project)
updateStmt.execute()
// 如果update没有查到数据,那么执行insert语句
if (updateStmt.getUpdateCount == 0) {
insertStmt.setString(1, value.prt_dt)
insertStmt.setString(2, value.project)
insertStmt.setInt(3, value.uv)
insertStmt.setInt(4, value.pv)
insertStmt.setString(5, value.update_date)
insertStmt.execute()
}
}
}



