flink的官网,提供了JDBC的连接实例,根据编程
首先就是添加依赖
mysql
mysql-connector-java
5.1.48
commons-dbutils
commons-dbutils
1.6
org.apache.flink
flink-connector-jdbc_2.12
1.14.0
添加完依赖后,根据flink的官网进行编程,前提条件:
1.本地安装有mysql
2.在mysql里面有相对应的表,来存储flink生成到的数据
3.nc(脑残)开启了数据流的产生,这个可不开启,只是为了演示方便而用的
create table books ( id int , name varchar(20) ); -- 在mysql的对应库中创建表格,我是在test库中创建了一个book表
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource lines = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator> mapped = lines.map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
String[] split = value.split(",");
return Tuple2.of(Integer.parseInt(split[0]), split[1]);
}
});
mapped.addSink(JdbcSink.sink(
"insert into books values(?,?)",
(ps, t) -> {
ps.setInt(1, t.f0); //读取对应的数据,写到sql的books表中
ps.setString(2, t.f1);
// ps.setString(3, t.f2);
// ps.setDouble(4, t.f3);
// ps.setInt(5, t.f4);
},
JdbcExecutionOptions.builder() //JDBC的执行选项
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() //JDBC的连接选项
.withUrl("jdbc:mysql://localhost:3306/test?characterEncoding=utf8")
.withDriverName("com.mysql.jdbc.Driver") //这个mysql驱动一定要写这个,别用官网写的那个
.withUsername("root")
.withPassword("root")
// .withConnectionCheckTimeoutSeconds(2)
.build()));
env.execute();
运行成功的那一刻,爽到家了.



