在上一篇我们演示了如何使用Flink 的Table Api 读取文件数据,并过滤特定字段的数据,本篇在上一篇的基础上,将从CSV文件中读取的数据重新输出到一个新的CSV文件中;
在实际业务场景下,也有不少类似的操作,即源文件放在某个文件目录下,通过flink程序读取,然后按照业务规则对读取进来的数据做适当的转换等类似ETL的操作,之后再输出到其他的地方,可以是文件系统,或者数据库、kafka、es等;
本例演示将数据写出到一个新的CSV文件中;
前置准备
准备一个CSV格式的文件,内容如下:
核心代码如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
public class Table443 {
public static void main(String[] args) throws Exception{
// 1. 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 表的创建:连接外部系统,读取数据
// 2.1 读取文件
String filePath = "E:\code-self\flink_study\src\main\resources\sensor.txt";
tableEnv.connect( new FileSystem().path(filePath))
.withFormat( new Csv())
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable");
Table inputTable = tableEnv.from("inputTable");
// inputTable.printSchema();
// tableEnv.toAppendStream(inputTable, Row.class).print();
// 3. 查询转换
// 3.1 Table API
// 简单转换
Table resultTable = inputTable.select("id, temp")
.filter("id === 's2'");
// 聚合统计
Table aggTable = inputTable.groupBy("id")
.select("id, id.count as count, temp.avg as avgTemp");
// 3.2 SQL
tableEnv.sqlQuery("select id, temp from inputTable where id = 's2'");
Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");
//输出到文件
String outPath = "E:\code-self\flink_study\src\main\resources\sensor_out.txt";
tableEnv.connect( new FileSystem().path(outPath))
.withFormat( new Csv())
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("outTable");
//将源头的数据写出到指定的注册表中区
System.out.println("准备将数据写出到另一个文件");
resultTable.insertInto("outTable");
System.out.println("写出成功");
env.execute();
}
}
运行这段代码,运行完毕之后,如果执行成功,可以在resources目录下检查新生成的文件,可以看到,通过程序过滤出id为s2的这条数据了



