1、写入到mysql时 利用到了RichFunciton的生命周期,但是实时计算出错不能保证状态的一致性,没有实现了两段锁提交协议
package flinkSourse
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer010, FlinkKafkaProducer011}
object FlinkSink {
def main(args: Array[String]): Unit = {
val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
executionEnvironment.setParallelism(1)
// 有界流 env.readTextFile
val stream2: DataStream[String] = executionEnvironment.readTextFile("src/main/resources/sensorReading.txt")
val transforStream: DataStream[String] = stream2.map(data => {
val tmpList: Array[String] = data.split(",")
SensorReading(tmpList(0), tmpList(1).toLong, tmpList(2).toDouble).toString
})
//1、写入kafka
//public class FlinkKafkaProducer011 extends TwoPhaseCommitSinkFunction 实现了两段锁提交协议
transforStream.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "sensorReading", new SimpleStringSchema()))
//2、写入mysql
transforStream.addSink(new MyMysqlSinkFunction() )
executionEnvironment.execute("sink")
}
}
//利用到了RichFunciton的生命周期,但是当实时计算出错不能保证状态的一致性,没有实现了两段锁提交协议
class MyMysqlSinkFunction extends RichSinkFunction[String] {
private var connection: Connection = _
private var pstm: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
connection = DriverManager.getConnection("jdbcurl", "root", "password")
pstm = connection.prepareStatement("insert into sensoTmp (id,temperatur) values (?,?)");
}
override def invoke(value: String): Unit = {
pstm.setDouble(1, value.toDouble)
pstm.setString(2, value)
pstm.execute();
}
override def close(): Unit = {
pstm.close()
connection.close()
}
}



