栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink的sink方法

flink的sink方法

1、写入到mysql时 利用到了RichFunciton的生命周期,但是实时计算出错不能保证状态的一致性,没有实现了两段锁提交协议

package flinkSourse

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer010, FlinkKafkaProducer011}

object FlinkSink {

  def main(args: Array[String]): Unit = {
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    executionEnvironment.setParallelism(1)
    // 有界流  env.readTextFile
    val stream2: DataStream[String] = executionEnvironment.readTextFile("src/main/resources/sensorReading.txt")
    val transforStream: DataStream[String] = stream2.map(data => {
      val tmpList: Array[String] = data.split(",")
      SensorReading(tmpList(0), tmpList(1).toLong, tmpList(2).toDouble).toString
    })
    //1、写入kafka
    //public class FlinkKafkaProducer011 extends TwoPhaseCommitSinkFunction 实现了两段锁提交协议
    transforStream.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "sensorReading", new SimpleStringSchema()))

    //2、写入mysql
    transforStream.addSink(new MyMysqlSinkFunction() )

    executionEnvironment.execute("sink")
  }
}

//利用到了RichFunciton的生命周期,但是当实时计算出错不能保证状态的一致性,没有实现了两段锁提交协议
class MyMysqlSinkFunction extends RichSinkFunction[String] {
  private var connection: Connection = _
  private var pstm: PreparedStatement = _


  override def open(parameters: Configuration): Unit = {
    connection = DriverManager.getConnection("jdbcurl", "root", "password")
    pstm = connection.prepareStatement("insert into sensoTmp (id,temperatur) values (?,?)");
  }

  override def invoke(value: String): Unit = {
    pstm.setDouble(1, value.toDouble)
    pstm.setString(2, value)
    pstm.execute();
  }

  override def close(): Unit = {
    pstm.close()
    connection.close()

  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389101.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号