栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flin Table Api 之读取dataStream数据

Flin Table Api 之读取dataStream数据

前言

我们知道,DataStream 是flink流式数据处理中一个非常重要的数据结构,可以将DataStream转换成许多其他的数据类型,进而做灵活的数据处理,同样基于DataStream 可以转为 Table ,这就非常厉害了;

举例来说,当通过文件读取的方式,将CSV文件读取成为DataStream 之后,再将DataStream 转换为Table,就可以使用Table下的各种API做数据上的进一步处理了,这就给诸如数据过滤,数据聚合统计等操作带来了非常大的便利性;

关于将 Table 转换成 DataStream相关理论知识总结

表可以转换为 DataStream 或 DataSet ,这样自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了;将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型;表作为流式查询的结果,是动态更新的;转换有两种转换模式:追加(Append)模式和撤回(Retract)模式

追加模式(Append Mode)

用于表只会被插入(Insert)操作更改的场景 使用API:

DataStream resultStream = tableEnv.toAppendStream(resultTable, Row.class);
撤回模式(Retract Mode)

用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作;得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(Delete);

使用API:

DataStream> aggResultStream = tableEnv.toRetractStream(aggResultTable , Row.class);

将 DataStream 转换成表具体API

对于一个 DataStream,可以直接转换成 Table,进而方便地调用 Table API 做转换操作

DataStream dataStream = ...
Table sensorTable = tableEnv.fromDataStream(dataStream);

默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来
DataStream dataStream = ...
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts,temperature");

下面看具体的代码

在resource目录下有一个CSV格式的文件,内容如下:

 

我们通过上面将 Table 转换成 DataStream 的方式将数据输出到控制台,具体代码如下:

import com.congge.source.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class TableApiDataStream1 {

    public static void main(String[] args) throws Exception{

        // 1. 创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2. 读入文件数据,得到DataStream
        String path = "E:\code-self\flink_study\src\main\resources\sensor.txt";
        DataStream inputStream = env.readTextFile(path);

        DataStream dataStream = inputStream.map(line ->{
            String[] fields = line.split(",");
            return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
        });

        Table dataTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp, pt.proctime");
        tableEnv.toAppendStream(dataTable, Row.class).print("result");

        env.execute();
    }

}

运行上面的代码,观察控制台效果,

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761313.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号