我们知道,DataStream 是flink流式数据处理中一个非常重要的数据结构,可以将DataStream转换成许多其他的数据类型,进而做灵活的数据处理,同样基于DataStream 可以转为 Table ,这就非常厉害了;
举例来说,当通过文件读取的方式,将CSV文件读取成为DataStream 之后,再将DataStream 转换为Table,就可以使用Table下的各种API做数据上的进一步处理了,这就给诸如数据过滤,数据聚合统计等操作带来了非常大的便利性;
关于将 Table 转换成 DataStream相关理论知识总结
表可以转换为 DataStream 或 DataSet ,这样自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了;将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型;表作为流式查询的结果,是动态更新的;转换有两种转换模式:追加(Append)模式和撤回(Retract)模式
追加模式(Append Mode)用于表只会被插入(Insert)操作更改的场景 使用API:
DataStream撤回模式(Retract Mode)resultStream = tableEnv.toAppendStream(resultTable, Row.class);
用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作;得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(Delete);
使用API:
DataStream将 DataStream 转换成表具体API> aggResultStream = tableEnv.toRetractStream(aggResultTable , Row.class);
对于一个 DataStream,可以直接转换成 Table,进而方便地调用 Table API 做转换操作
DataStream默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来dataStream = ... Table sensorTable = tableEnv.fromDataStream(dataStream);
DataStreamdataStream = ... Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts,temperature");
下面看具体的代码
在resource目录下有一个CSV格式的文件,内容如下:
我们通过上面将 Table 转换成 DataStream 的方式将数据输出到控制台,具体代码如下:
import com.congge.source.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class TableApiDataStream1 {
public static void main(String[] args) throws Exception{
// 1. 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 读入文件数据,得到DataStream
String path = "E:\code-self\flink_study\src\main\resources\sensor.txt";
DataStream inputStream = env.readTextFile(path);
DataStream dataStream = inputStream.map(line ->{
String[] fields = line.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
});
Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, pt.proctime");
tableEnv.toAppendStream(dataTable, Row.class).print("result");
env.execute();
}
}
运行上面的代码,观察控制台效果,



