栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Apache Flink学习笔记三 - 使用Flink1.14.0 TableAPI对EventTime进行Window操作时碰到的问题

Apache Flink学习笔记三 - 使用Flink1.14.0 TableAPI对EventTime进行Window操作时碰到的问题

更多内容请访问:bbhhhh.github.io

使用Flink1.14.0 TableAPI对EventTime进行Window操作时碰到的问题

最近有空又开始继续研究Flink了,直接上最新的稳定版1.14.0,没想到出师不利,在使用TableAPI进行Window聚合操作时碰到以下问题:
如果用EventTime进行Window操作,转换成DataStream后调用print()方法,控制台没有任何输出;但改成ProcessingTime进行Window操作却一切正常。

使用DataStream API对EventTime进行Window操作也是正常的,百思不得其解。

核心pom如下:

        
            org.apache.flink
            flink-java
            1.14.0
        
        
            org.apache.flink
            flink-streaming-java_2.12
            1.14.0
        
        
            org.apache.flink
            flink-clients_2.12
            1.14.0
        
        
            org.apache.flink
            flink-connector-kafka_2.12
            1.14.0
        
        
            org.apache.flink
            flink-json
            1.14.0
        
        
            org.apache.flink
            flink-table-api-java-bridge_2.12
            1.14.0

        
        
            org.apache.flink
            flink-table-planner_2.12
            1.14.0

        
        
            org.apache.flink
            flink-streaming-scala_2.12
            1.14.0

        
        
            org.apache.flink
            flink-table-common
            1.14.0

        

测试代码如下:

package com.nokia.itms.flink.sql;

import java.util.Properties;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.nokia.itms.flink.FlinkConsumer;
import com.nokia.itms.kafka.KafkaConfig;

public class CountEventsPerDeviceByTumbleWindowTableAPI extends FlinkConsumer {
    private static final Logger logger = LoggerFactory.getLogger(CountEventsPerDeviceByTumbleWindowTableAPI.class);

    
    public CountEventsPerDeviceByTumbleWindowTableAPI(String topic) {
        super(topic);

    }

    @Override
    public void run() {
        Properties kafkaProps = (Properties) KafkaConfig.getInstance().getKafkaProps().clone();
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.getClass().getSimpleName());
        logger.info("Event max out of orderness = {} {}",MAX_OUT_OF_ORDERNESS.getSize(),MAX_OUT_OF_ORDERNESS.getUnit());

        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(execEnv, bsSettings);


        String createTable = "n CREATE TABLE rxpower_detail (n" +
                "    actualTime BIGINT,n" +
                "    ponInfo ROW(PonRXPower INT),n" +
                "    deviceInfo ROW(deviceId STRING), n" +
                "    event_time AS TO_TIMESTAMP(FROM_UNIXTIME(actualTime/1000,'yyyy-MM-dd HH:mm:ss')) ,n" +
//                "    event_time AS TO_TIMESTAMP_LTZ(actualTime,3) ,n" +
                "    WATERMARK FOR event_time AS event_time - INTERVAL '" +
                MAX_OUT_OF_ORDERNESS.getSize() +
                "' " +
                MAX_OUT_OF_ORDERNESS.getUnit() + " , n" +
                " proc_time as PROCTIME() n" +
                "    )n" +
                " with ( n" +
                "'connector' = 'kafka'" + ",n" +
                "'topic' = '" + this.topic + "',n" +
                "'properties.bootstrap.servers' = '" + kafkaProps.getProperty("bootstrap.servers")+ "',n" +
                //"'properties.group.id' = '" + kafkaProps.getProperty("group.id")+ "',n" +
                "'scan.startup.mode' = 'latest-offset'" + ",n" +
                //"'scan.startup.mode' = 'group-offsets'" + ",n" +
                "'format' = 'json'" + ",n" +
                "'json.fail-on-missing-field' = 'false'" +",n" +
                "'json.ignore-parse-errors' = 'true'" +"n" +
                ")" +"n" ;

        logger.debug(createTable);

        tableEnv.executeSql(createTable);

        String selectSql = "";

        selectSql = ""
                + "ncreate temporary view tumble_windowed_result as "
                + "n select window_start as w_s,window_end as w_e, window_time as w_t, deviceId, count(deviceId) as lc "
                + "n from TABLE (TUMBLE(TABLE rxpower_detail,DEscriptOR(event_time), INTERVAL '5' SECOND))"
//                + " from TABLE (TUMBLE(TABLE rxpower_detail,DEscriptOR(proc_time), INTERVAL '5' SECOND))"
                + "n group by window_start,window_end,window_time, deviceId  "
                +"n"
                ;
        logger.info(selectSql);
        tableEnv.executeSql(selectSql);
//
        selectSql = "select * from tumble_windowed_result " ;

        logger.info(selectSql);

        Table table2 = tableEnv.sqlQuery(selectSql);
        DataStream resultStream = tableEnv.toDataStream(table2);

        resultStream.print();

        try {
            execEnv.execute("table api test");
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }

    }

    public static void main(String... args) {
        new CountEventsPerDeviceByTumbleWindowTableAPI("topic.periodic").run();
    }
}

运行上述代码,模拟向Kafka发送测试数据,控制台没有任何输出,但如果将上面的

from TABLE (TUMBLE(TABLE rxpower_detail,DEscriptOR(event_time), INTERVAL '5' SECOND))

换成Processing Time字段,改成

from TABLE (TUMBLE(TABLE rxpower_detail,DEscriptOR(proc_time), INTERVAL '5' SECOND))

就一切正常了,控制台能正常输出统计数据。

另外,如果将Flink版本换成1.13.2,并使用Blink table planner,不管是Event Time还是ProcessingTime又都是正常的。换成1.13.2后的核心pom如下:

        
            org.apache.flink
            flink-java
            1.13.2
        
        
            org.apache.flink
            flink-streaming-java_2.12
            1.13.2
        
        
            org.apache.flink
            flink-clients_2.12
            1.13.2
        
        
            org.apache.flink
            flink-connector-kafka_2.12
            1.13.2
        
        
            org.apache.flink
            flink-json
            1.13.2
        
        
            org.apache.flink
            flink-table-api-java-bridge_2.12
            1.13.2

        
        
            org.apache.flink
            flink-table-planner_2.12
            1.13.2

        
        
            org.apache.flink
            flink-streaming-scala_2.12
            1.13.2

        
        
            org.apache.flink
            flink-table-planner-blink_2.12
            1.13.2
        
        
            org.apache.flink
            flink-table-common
            1.13.2

        

这个问题也不知是不是Flink1.14.0版本的问题。

EOF

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336137.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号