栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink写入数据到MySQL案例

Flink写入数据到MySQL案例

案例准备:

1、启动MySQL,在mysql中创建数据库flinkdb,并创建表sensor_temp

CREATE TABLE sensor_temp  (
  id varchar(32),
  temp double
) 
代码实现:
def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[SensorReading] = env.addSource(new MyDefSource)
    dataStream.addSink(new MyJdbcSinkFunction())

    env.execute()
}

class MyJdbcSinkFunction extends RichSinkFunction[SensorReading]{
  var connection: Connection =_
  var insertStmt: PreparedStatement=_
  var updateStmt: PreparedStatement=_
  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    updateStmt.setDouble(1,value.temperature)
    updateStmt.setString(2,value.id)
    updateStmt.execute()

    if(updateStmt.getUpdateCount == 0){
      insertStmt.setString(1,value.id)
      insertStmt.setDouble(2,value.temperature)
      insertStmt.execute()
    }

  }

  override def open(parameters: Configuration): Unit = {
    connection = DriverManager.getConnection("jdbc:mysql://192.168.91.180:3306/flinkdb?useSSL=false", "root", "123123")
    insertStmt = connection.prepareStatement("insert into sensor_temp(id,temp) value(?,?)")
    updateStmt = connection.prepareStatement("update sensor_temp set temp=? where id=?")

  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    connection.close()
  }
运行结果:

查询数据select * from sensor_temp;

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745903.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号