栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

FlinkSQL Table API和SQL(三)

FlinkSQL Table API和SQL(三)

一、回顾

上一节介绍了使用Flink Table API 和 SQL在source端和sink连接外部文件系统,以及混用Table API 和 SQL进行一些简单的查询转换。实际生产环境中使用Flink连接外部系统的场景数不胜数,可能是外部文件系统的多种文件格式,如 CSV、Apache Parquet、Apache Avro等;可能是存储系统,如 JDBC、Apache Hbase、Apache Cassandra、Elasticsearch等,或消息队列系统,如 Apache Kafka、RabbitMQ等。

本节将继续介绍使用Flink Table API 和 SQL连接一些常用的外部系统。

二、更新模式介绍:

对于流式查询,需要声明如何在表和外部连接器之间执行转换。与外部系统交换的消息类型,由更新模式(Uadate Mode)指定:

追加(Append)模式:表只做插入操作,和外部连接器只交换插入(Insert)消息

撤回(Retract)模式:表和外部连接器交换添加(Add)和撤回(Retract)消息。插入操作(Insert)编码为Add消息;删除(Delete)编码为Retract消息;更新(Update)编码为上一条的Retract和下一条的Add消息

更新插入(Upsert)模式:更新和插入都被编码为Upsert消息;删除编码为Delete消息

三、读写Kafka

Kafka作为消息队列,和文件系统类似的,只能往里追加数据,不能修改数据。

1、使用Table API读写Kafka 1.1 我们经常使用Table API 的 tableEnv.connect(new Kafka()) 来连接Kafka,然后在其中指定Kafka的版本、topic和一些连接参数。

具体请参考下面的demo:消费sensor topic 的数据,并进行一些简单查询转换,然后将数据写到sinktest topic。

    public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //3.连接Kafka,消费sensor topic的数据
        tableEnv.connect(new Kafka()
                        .version("universal")
                        .topic("sensor")
                        .property("zookeeper.connect", "localhost:2181")
                        .property("bootstrap.servers", "localhost:9092")
                ).withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                ).createTemporaryTable("inputTable");

        //4.查询转换
        Table inputTable = tableEnv.from("inputTable");

        //新版Table API
        Table resultTable = inputTable.select($("id"), $("timestamp"), $("temp"))
                .filter($("id").isEqual("sensor_6"));

        

        //6.建立Kafka连接,输出到sinktest topic下
        tableEnv.connect(new Kafka()
                        .version("universal")
                        .topic("sinktest")
                        .property("zookeeper.connect", "localhost:2181")
                        .property("bootstrap.servers", "localhost:9092")
                ).withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE()))
                .createTemporaryTable("outputTable");

        //查询结果集输出到outputTable
        resultTable.insertInto("outputTable");

        tableEnv.execute("");
    }
1.2 本地测试环境开启zookeeper 和 Kafka 服务,创建sensor 和 sinktest topic:
kafka-topic.sh --zookeeper localhost:2181 --create --topic sensor --partitions 1 --replication-factor 1

kafka-topic.sh --zookeeper localhost:2181 --create --topic sinktest --partitions 1 --replication-factor 1
1.3 启动Kafka生产者和消费者console:
kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sinktest
1.4 启动Flink程序,并在Kafka生产者console输入数据,观察消费者端console输出:

输入

>sensor_1,1547718199,35.8
>sensor_6,1547718201,15.4
>sensor_7,1547718202,6.7
>sensor_10,1547718205,38.1
>sensor_6,1547718209,34.5

输出

sensor_6,1547718201,15.4

sensor_6,1547718209,34.5

观察消费者端输出,只消费到id为sensor_6的数据,说明程序连接Kafka并进行简单转换没有问题。

2、使用Flink SQL读写Kafka

输入端消费Kafka,输出端将数据生产到Kafka,我们都使用Flink SQL 来完成:

public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //3.连接Kafka,消费sensor topic的数据
        String inputTable = "sensor";
        String dropSql = "DROP TABLE IF EXISTS " + inputTable;
        String kafkaTableDDL
                = "CREATE TABLE " + inputTable + " (n" +
                "    id String,n" +
                "    ts BIGINT,n" +
                "    temp DOUBLEn" +
                ") WITH (n" +
                "   'connector.type' = 'kafka',n" +
                "   'connector.version' = 'universal',n" +
                "   'connector.topic' = 'sensor',n" +
                "   'connector.properties.bootstrap.servers' = 'localhost:9092',n" +
                "   'format.type' = 'csv',n" +
                "   'update-mode' = 'append'n" +
                ")";

        //执行输入端的建表语句
        tableEnv.executeSql(dropSql);
        tableEnv.executeSql(kafkaTableDDL);

        //6.建立Kafka连接,输出到不同的topic下
        String outputTable = "sensor_2";
        String dropSql2 = "DROP TABLE IF EXISTS " + outputTable;
        String kafkaTableDDL2
                = "CREATE TABLE " + outputTable + " (n" +
                "    id String,n" +
                "    ts BIGINT,n" +
                "    temp DOUBLEn" +
                ") WITH (n" +
                "   'connector.type' = 'kafka',n" +
                "   'connector.version' = 'universal',n" +
                "   'connector.topic' = 'sinktest',n" +
                "   'connector.properties.bootstrap.servers' = 'localhost:9092',n" +
                "   'format.type' = 'csv',n" +
                "   'update-mode' = 'append'n" +
                ")";

        //执行输出端的建表语句
        tableEnv.executeSql(dropSql2);
        tableEnv.executeSql(kafkaTableDDL2);

        //简单查询转换,将结果输出到表sensor_2,即输出到Kafka 的sinktest topic
        String insertSql = "INSERT INTO sensor_2 " +
                "SELECT id , ts , temp FROM sensor WHERe id='sensor_6'";

        tableEnv.executeSql(insertSql);
    }
3、使用upsert-kafka connector读写Kafka

在某些场景中,比如GROUP BY聚合之后的结果,需要去更新之前的结果值。这个时候,需要将 Kafka 消息记录的 key 当成主键处理,用来确定一条数据是应该作为插入、删除还是更新记录来处理。在Flink1.11中,可以通过 flink-cdc-connectors 项目提供的 changelog-json format来实现该功能。

在Flink1.12版本中, 新增了一个 upsert connector(upsert-kafka),该 connector 扩展自现有的 Kafka connector,工作在 upsert 模式下。新的 upsert-kafka connector 既可以作为 source 使用,也可以作为 sink 使用,并且提供了与现有的 kafka connector 相同的基本功能和持久性保证,因为两者之间复用了大部分代码。

以下示例代码展示了如何利用upsert-kafka connector以upsert模式向kafka中生产数据,

注意:需要添加flink-connector-kafka相关依赖:

		
            org.apache.flink
            flink-connector-kafka_2.12
            ${flink.version}
        

示例代码:

public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //3.连接Kafka,消费test topic的数据
        String inputTable = "sensor";
        String dropSql = "DROP TABLE IF EXISTS " + inputTable;
        String kafkaTableDDL
                = "CREATE TABLE " + inputTable + " (n" +
                "    id String,n" +
                "    ts BIGINT,n" +
                "    temp DOUBLEn" +
                ") WITH (n" +
                "   'connector.type' = 'kafka',n" +
                "   'connector.version' = 'universal',n" +
                "   'connector.topic' = 'test',n" +
                "   'connector.properties.group.id' = 'main-group',n" +
                "   'connector.properties.bootstrap.servers' = 'localhost:9092',n" +
                "   'format.type' = 'csv',n" +
                "   'update-mode' = 'append'n" +
                ")";

        //执行输入端的建表语句
        tableEnv.executeSql(dropSql);
        tableEnv.executeSql(kafkaTableDDL);

        //6.建立Kafka连接,输出到不同的topic下
        String outputTable = "sensor_2";
        String dropSql2 = "DROP TABLE IF EXISTS " + outputTable;
        String kafkaTableDDL2
                = "CREATE TABLE " + outputTable + " (n" +
                "    id String,n" +
                "    cnt BIGINT,n" +
                "    avgTemp DOUBLE,n" +
                "    PRIMARY KEY (id) NOT ENFORCED n" +
                ") WITH (n" +
                "    'connector' = 'upsert-kafka',n" +
                "    'topic' = 'sink_test',n" +
                "    'properties.bootstrap.servers' = 'localhost:9092',n" +
                "    'key.format' = 'json',n" +
                "    'key.json.ignore-parse-errors' = 'true',n" +
                "    'value.format' = 'json',n" +
                "    'value.json.fail-on-missing-field' = 'false'n" +
                ")";

        //执行输出端的建表语句
        tableEnv.executeSql(dropSql2);
        tableEnv.executeSql(kafkaTableDDL2);

        //简单聚合查询,以upsert模式将结果输出到表sensor_2,即输出到Kafka 的sink_test topic
        String insertSql = "INSERT INTO sensor_2 " +
                "SELECT id , count(id) as cnt , avg(temp)  as avgTemp FROM sensor GROUP BY id";


        tableEnv.executeSql(insertSql);
		//查询表sensor_2
        TableResult result = tableEnv.executeSql("select * from sensor_2");
        result.print();
    }

启动程序,并向test topic中生产数据:

>sensor_1,1547718199,35.8
>sensor_1,1547718199,34.2 

控制台输出结果如下:

log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
+----+--------------------------------+----------------------+--------------------------------+
| op |                             id |                  cnt |                        avgTemp |
+----+--------------------------------+----------------------+--------------------------------+
| +I |                       sensor_1 |                    1 |                           35.8 |
| -U |                       sensor_1 |                    1 |                           35.8 |
| +U |                       sensor_1 |                    2 |                           35.0 |

其中op列展示了数据是新增还是更新的结果, +I 表示数据新增;-U 表示数据删除;+U 表示数据更新;当生产第一条数据时候,id为sensor_1的数据count计数为1,平均温度为35.8,生产第二条数据以后,原来第一条数据删除,count更新为2,平均温度更新为35.0。

upsert-kafka connector的一些配置项如下:

topic:必选。用于读取和写入的 Kafka topic 名称。

properties.bootstrap.servers:必选。以逗号分隔的 Kafka brokers 列表。

key.format:必选。用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 ‘csv’、‘json’、‘avro’。

value.format:必选。用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 ‘csv’、‘json’、‘avro’。

properties.*: 可选。该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。

value.fields-include: 可选,默认为ALL。控制key字段是否出现在 value 中。当取ALL时,表示消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。当取EXCEPT_KEY时,表示记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。

key.fields-prefix:可选。为了避免与value字段命名冲突,为key字段添加一个自定义前缀。默认前缀为空。一旦指定了key字段的前缀,必须在DDL中指明前缀的名称,但是在构建key的序列化数据类型时,将移除该前缀。见下面的示例。在需要注意的是:使用该配置属性,value.fields-include的值必须为EXCEPT_KEY。

三、sink端输出到Elasticsearch

输出到es支持upsert模式,当有数据需要更新时,根据es 的 _id进行更新,需要添加如下依赖:

 
        
            org.apache.flink
            flink-connector-elasticsearch6_2.11
            1.12.1
        

如下示例代码使用Table API消费Kafka指定topic的数据,进行简单聚合查询后,以upsert模式输出到Elasticsearch:

public static void main(String[] args) throws Exception {
        // 1、创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、设置并行度
        env.setParallelism(1);

        // 3、创建表执行环境
        EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance()
                .useOldPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, oldStreamSettings);

        //4.连接Kafka,消费数据
        tableEnv.connect(new Kafka()
                        .version("universal")
                        .topic("sensor")
                        .property("zookeeper.connect", "localhost:2181")
                        .property("bootstrap.servers", "localhost:9092")
                ).withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                ).createTemporaryTable("inputTable");

        //5.聚合统计
        Table inputTable = tableEnv.from("inputTable");
        inputTable.printSchema();

        //新版Table API
        Table aggTable = inputTable.groupBy($("id"))
                .select($("id").count().as("count"), $("temp").avg().as("avgTemp"));

        //旧版Table API
        // Table aggTable = inputTable.groupBy("id").select("id ,id.count as count, temp.avg as avgTemp");

        //6.连接es,聚合统计结果输出到es
        tableEnv.connect(new Elasticsearch()
                        .version("6")      //es版本
                        .host("127.0.0.1", 9200, "http")      //连接地址
                        .index("sensor")        //索引名
                        .documentType("temp")   //文档类型
                        .bulkFlushInterval(2000)  //批量插入到es的时间间隔
                )
                .inUpsertMode()
                .withFormat(new Json())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("count", DataTypes.BIGINT())
                        .field("avgTemp", DataTypes.DOUBLE()))
                .createTemporaryTable("esOutputTable");

        //聚合结果输出到esOutputTable
        aggTable.insertInto("esOutputTable");

        tableEnv.execute("outPut2ES");
    }
四、sink端输出到MySQL

引入 flink-connector-jdbc 依赖:

		
            org.apache.flink
            flink-connector-jdbc_${scala.binary.version}
            ${flink.version}
        

如下代码source端和sink端都使用SQL操作,消费kafka指定topic数据,并进行简单聚合查询,以upsert模式将聚合统计结果输出到MySQL:

public static void main(String[] args) throws Exception {
        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //2.创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //3.连接Kafka,读取数据
        String kafkaTableDDL
                = "CREATE TABLE  inputTable (n" +
                "    id String,n" +
                "    ts BIGINT,n" +
                "    temp DOUBLEn" +
                ") WITH (n" +
                "   'connector.type' = 'kafka',n" +
                "   'connector.version' = 'universal',n" +
                "   'connector.topic' = 'sensor',n" +
                "   'connector.properties.bootstrap.servers' = 'localhost:9092',n" +
                "   'format.type' = 'csv',n" +
                "   'update-mode' = 'append'n" +
                ")";

        //执行输入端的建表语句
        tableEnv.executeSql(kafkaTableDDL);

        //4.sink端连接mysql,将数据进行输出
        String sinkDDL =
                "create table sensor_count (" +
                        " id varchar(20) not null, " +
                        " cnt bigint not null, " +
                        " avg_temp double not null " +
                        ") with (" +
                        " 'connector.type' = 'jdbc', " +
                        " 'connector.url' = 'jdbc:mysql://localhost:3306/test', " +
                        " 'connector.table' = 'sensor_count', " +
                        " 'connector.driver' = 'com.mysql.cj.jdbc.Driver', " +
                        " 'connector.username' = 'root', " +
                        " 'connector.password' = 'admin', " +
                        " 'connector.write.flush.interval' = '2s' )";

        //执行ddl创建表
        tableEnv.executeSql(sinkDDL);

        //5.聚合数据插入jdbc表
        String insertSql = "INSERT INTO sensor_count " +
                "SELECT id, count(id) as cnt, avg(temp) as avgTemp FROM inputTable GROUP BY id";

        tableEnv.executeSql(insertSql);
    }

Kafka生产端console输入以下数据:

>sensor_1,1547718199,35.8
>sensor_1,1547718199,34.2        
>sensor_6,1547718212,37.1
>sensor_6,1547718212,32.9

MySQL的 sensor_count 表中数据发生更新:

五、总结

本节介绍了使用 Flink Table API 和 SQL连接常见的外部系统进行操作,并放出了使用Table API或者SQL 操作Kafka、ES、MySQL的demo,自己在实践学习中也踩了不少坑。总之一句话,实践出真知。光是理论的学习是没用的,只有实实际际敲了代码,代入理论去思考,才会有一定感悟,而每一次报错,实际上就是经验的积累。博文中有错误或不足的地方,还清兄弟们指正。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711196.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号