在上一篇我们通过代码演示,通过读取kafak的数据,然后将数据再次写出到kafka中,本篇将分享使用Flink Table Api 如何将读取到的文件的数据写出到mysql ;
业务应用在某些场景下,通过读取源数据后,需要经过flink程序的处理,然后将数据持久化存储到数据库、或es等,作为后续其他业务继续使用
代码演示前置准备,在resource目录下,准备一个测试用的csv文件,作为flink读取的数据的源文件,内容如下:
本例,我们将沿用上一篇的案例逻辑,通过读取csv文件,再通过Flink Table Api 的使用将读取到的数据进行聚合统计,最后将统计结果写出到mysql中,因此需要提前创建一张表;
如下是提前创建好的空表,这里有2个字段,一个是id,一个是统计结果值;
1、添加maven依赖
org.apache.flink flink-jdbc_2.121.10.1
2、核心代码如下
port org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
public class TableMysql {
public static void main(String[] args) throws Exception{
// 1. 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 表的创建:连接外部系统,读取数据
// 2.1 读取文件
String filePath = "E:\code-self\flink_study\src\main\resources\sensor.txt";
tableEnv.connect( new FileSystem().path(filePath))
.withFormat( new Csv())
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
Table inputTable = tableEnv.from("inputTable");
// inputTable.printSchema();
// tableEnv.toAppendStream(inputTable, Row.class).print();
// 3. 查询转换
// 3.1 Table API
// 简单转换
Table resultTable = inputTable.select("id, temp")
.filter("id === 's2'");
// 聚合统计
Table aggTable = inputTable.groupBy("id")
.select("id, id.count as count, temp.avg as avgTemp");
// 3.2 SQL
tableEnv.sqlQuery("select id, temp from inputTable where id = 's2'");
Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id");
//将数据写出到mysql
String sinkDDL=
"create table jdbcOutputTable (" +
" id varchar(20) not null, " + " cnt bigint not null " + ") with (" + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://IP:3306/bank1', " + " 'connector.table' = 'sensor_count', " + " 'connector.driver' = 'com.mysql.jdbc.Driver', " + " 'connector.username' = 'root', " + " 'connector.password' = 'root' )";
tableEnv.sqlUpdate(sinkDDL) ;
// 执行 DDL创建表
sqlAggTable.insertInto("jdbcOutputTable");
env.execute();
}
}
运行这段程序,然后观察数据库中表的数据,可以看到数据成功写入到mysql



