栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

JAVA实现Flink Table 基于事件时间的滑动窗口

JAVA实现Flink Table 基于事件时间的滑动窗口

JAVA实现Flink Table基于事件时间的滑动窗口代码样例

package org.fenghua.example.table.windos;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;


public class KafkaSlidingEventTimeWindos {
    private final static String SOURCE_TOPIC = "topic3";
    private final static String ZOOKEEPER_ConNECT = "127.0.0.1:2181";
    private final static String metaDATA_BROKER_LIST = "127.0.0.1:9092";

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
        tEnv.connect(
                new Kafka()
                        .version("0.10")
                        .topic(SOURCE_TOPIC)
                        .startFromLatest()
                        .property("zookeeper.connect", ZOOKEEPER_CONNECT)
                        .property("bootstrap.servers", metaDATA_BROKER_LIST)
        )
                .withFormat(
                        new Json()
                                .schema(
                                        org.apache.flink.table.api.Types.ROW(
                                                new String[]{"id", "product", "amount","createTime"},
                                                new TypeInformation[]{
                                                        Types.LONG,
                                                        Types.STRING,
                                                        Types.INT,
                                                        Types.SQL_TIMESTAMP,
                                                }))
                                .failonMissingField(true)
                )
                .withSchema(
                        new Schema()
                                .field("id", Types.LONG)
                                .field("product", Types.STRING)
                                .field("amount", Types.INT)
                                .field("rowTime", Types.SQL_TIMESTAMP)
                                .rowtime(new Rowtime()
                                        .timestampsFromField("createTime")
                                        .watermarksPeriodicBounded(1000))
                )
                .inAppendMode()
                .registerTableSource("sourceTable");

        Table table1 = tEnv.sqlQuery("select id, rowTime from sourceTable");
        Table table2 = table1
                .window(Slide.over("20.second").every("10.second").on("rowTime").as("w"))
                .groupBy("id,w")
                .select("id,id.count");
        tEnv.toRetractStream(table2, Row.class).print();
        env.execute(" test ");

    }

}

代码仓库https://gitee.com/xuguoxi/FlinkLearn/blob/master/src/main/java/org/fenghua/example/table/windos/KafkaSlidingEventTimeWindos.javahttps://gitee.com/xuguoxi/FlinkLearn/blob/master/src/main/java/org/fenghua/example/table/windos/KafkaSlidingEventTimeWindos.java仓库里面有其他样例代码

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583929.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号