- 一.Sink之JDBC概述
- 二.pom文件配置
- 三.MySQL配置
- 四.编写Java代码
- 五.运行Flink程序查看数据
- 参考:
Flink的Sink支持的数据库:
Bahir中支持的数据库:
从上两图可以看到,Flink的Sink并支持类似MySQL的这种关系型数据库,那么如果我需要通过Flink连接MySQL,该如何操作呢?
这个时候我们可以使用Flink Sink的JDBC连接。
二.pom文件配置此处,我本地MySQL版本是 8.0.19
三.MySQL配置mysql mysql-connector-java8.0.19
新建数据库及表
CREATE DATAbase `flink_test` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci; CREATE TABLE `sensor_temp` ( `id` varchar(32) NOT NULL, `temp` double NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;四.编写Java代码
package org.flink.sink;
import org.flink.beans.SensorReading;
import org.example.SourceTest4_UDF;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class SinkTest4_Jdbc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件读取数据
// DataStream inputStream = env.readTextFile("D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt");
//
// // 转换成SensorReading类型
// DataStream dataStream = inputStream.map(line -> {
// String[] fields = line.split(",");
// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
// });
DataStream dataStream = env.addSource(new SourceTest4_UDF.MySensorSource());
dataStream.addSink(new MyJdbcSink());
env.execute();
}
// 实现自定义的SinkFunction
public static class MyJdbcSink extends RichSinkFunction {
// 声明连接和预编译语句
Connection connection = null;
PreparedStatement insertStmt = null;
PreparedStatement updateStmt = null;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456");
insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
}
// 每来一条数据,调用连接,执行sql
@Override
public void invoke(SensorReading value, Context context) throws Exception {
// 直接执行更新语句,如果没有更新那么就插入
updateStmt.setDouble(1, value.getTemperature());
updateStmt.setString(2, value.getId());
updateStmt.execute();
if( updateStmt.getUpdateCount() == 0 ){
insertStmt.setString(1, value.getId());
insertStmt.setDouble(2, value.getTemperature());
insertStmt.execute();
}
}
@Override
public void close() throws Exception {
insertStmt.close();
updateStmt.close();
connection.close();
}
}
}
五.运行Flink程序查看数据
mysql> select * from sensor_temp; +-----------+-------------------+ | id | temp | +-----------+-------------------+ | sensor_3 | 65.31089123002162 | | sensor_10 | 20.23454807781744 | | sensor_4 | 79.87349739590283 | | sensor_1 | 68.79742249825429 | | sensor_2 | 44.1766638371653 | | sensor_7 | 99.47000620947128 | | sensor_8 | 68.7360059804266 | | sensor_5 | 69.9135258264366 | | sensor_6 | 38.85722751176939 | | sensor_9 | 69.97758295030204 | +-----------+-------------------+ 10 rows in set (0.00 sec) mysql>参考:
- https://www.bilibili.com/video/BV1qy4y1q728
- https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_521-%e4%bb%8e%e9%9b%86%e5%90%88%e8%af%bb%e5%8f%96%e6%95%b0%e6%8d%ae



