基于select语句的Flink-CDC 适用于数据同步的全量同步的场景,可以结合 Azkaban 或者dolphin scheduler 做定时调度 T+1 数据同步。
1、maven2、MysqlReader1.13.6 2.11 org.apache.flink flink-java${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version}${flink.version} org.apache.flink flink-clients_${scala.binary.version}${flink.version} org.apache.flink flink-runtime-web_${scala.binary.version}${flink.version} com.alibaba.ververica flink-connector-mysql-cdc1.4.0 org.projectlombok lombok1.18.20 com.alibaba fastjson1.2.75 org.apache.maven.plugins maven-shade-plugin3.1.0 false package shade com.flink.cdc.demo.MysqlCdcMysql reference.conf *:*:*:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA org.apache.maven.plugins maven-compiler-plugin6 6
package com.flink.cdc.demo; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; public class MysqlReader extends RichSourceFunction3、MysqlWriter> { private Connection connection = null; private PreparedStatement ps = null; //该方法主要用于打开数据库连接,下面的ConfigKeys类是获取配置的类 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动 connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接 ps = connection.prepareStatement("select id,username,password from flink_cdc_test.t_test"); } @Override public void run(SourceContext > sourceContext) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { Tuple3 tuple = new Tuple3 (); tuple.setFields(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3)); sourceContext.collect(tuple); } } @Override public void cancel() { try { super.close(); if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } catch (Exception e) { e.printStackTrace(); } } }
package com.flink.cdc.demo; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class MysqlWriter extends RichSinkFunction4、主类MysqlCdcMysql> { private Connection connection = null; private PreparedStatement ps = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); if (connection == null) { Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动 connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接 } ps = connection.prepareStatement("insert into ods_flink_cdc_test.ods_t_test values (?,?,?)"); System.out.println("完成"); } @Override public void invoke(Tuple3 value, Context context) throws Exception { //获取JdbcReader发送过来的结果 try { ps.setInt(1, value.f0); ps.setString(2, value.f1); ps.setString(3, value.f2); ps.executeUpdate(); } catch (Exception e) { e.printStackTrace(); } } @Override public void close() throws Exception { super.close(); if (ps != null) { ps.close(); } if (connection != null) { connection.close(); } super.close(); } }
package com.flink.cdc.demo;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MysqlCdcMysql {
public static void main(String[] args) throws Exception {
// ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost",8081,"D:\flink-steven\target\flink-0.0.1-SNAPSHOT.jar");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "D:\flink-steven\target\flink-0.0.1-SNAPSHOT.jar");
// 最好不要在程序设置并行度 如果设置了8 要保证安装的flink配置里面的parallelism这个参数大于8 不然会导致资源异常
// env.setParallelism(8);
DataStreamSource> dataStream = env.addSource(new MysqlReader());
dataStream.print();
dataStream.addSink(new MysqlWriter());
env.execute("Flink cost MySQL data to write MySQL");
}
}
5、本地运行
6、打成jar包进行上传
注意:flink版本要和maven里的版本一致 scala版本也要保持一致
7、运行


