前言:
这里使用JDBCInputFormat类专门实现Flink读取MySQL数据的功能。
依赖:
org.apache.flink flink-java 1.14.0 org.apache.flink flink-streaming-java_2.12 1.14.0 provided org.apache.flink flink-clients_2.12 1.14.0 org.apache.flink flink-jdbc_2.12 1.10.0 provided
导包:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row;
java代码:
public class FromMySQL {
public static void main(String[] args) throws Exception {
JDBCInputFormat input = new JDBCInputFormat.JDBCInputFormatBuilder()
.setDrivername("com.mysql.cj.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/test?user=root&password=123456")
.setQuery("select * from people")
//设置获取的数据的类型
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
.finish();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource input1 = env.createInput(input);
input1.map(new MapFunction() {
@Override
public String map(Row row) throws Exception {
return row.toString();
}
}).print();
//离线批处理的print(),count(),collect()等都具有execute()的功能。即如果使用了这些就不需要提交execute()了
//如果是流处理则必须提交execute()
}
}
MySQL数据:
运行结果:



