栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Apache Flink学习笔记四 - Flink TableAPI Over Aggregation操作碰到的问题

Apache Flink学习笔记四 - Flink TableAPI Over Aggregation操作碰到的问题

更多内容请访问:bbhhhh.github.io

Flink TableAPI Over Aggregation操作碰到的问题

最近在学习Flink TableAPI Over Aggregation操作时又碰到了奇怪的问题,在Flink1.13.2版本上,当Order By字段是TIMESTAMP_LTZ类型时,会抛错;但如果是TIMESTAMP类型时就是正常的。

测试代码如下:

package com.nokia.itms.flink.sql;

import java.util.Properties;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.nokia.itms.flink.FlinkConsumer;
import com.nokia.itms.kafka.KafkaConfig;

public class PrintLowPowerDevicesByOverAggregationTableAPI extends FlinkConsumer {
    private static final Logger logger = LoggerFactory.getLogger(PrintLowPowerDevicesByOverAggregationTableAPI.class);

    
    public PrintLowPowerDevicesByOverAggregationTableAPI(String topic) {
        super(topic);

    }

    @Override
    public void run() {
        Properties kafkaProps = (Properties) KafkaConfig.getInstance().getKafkaProps().clone();
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.getClass().getSimpleName());
        logger.info("Event max out of orderness = {} {}",MAX_OUT_OF_ORDERNESS.getSize(),MAX_OUT_OF_ORDERNESS.getUnit());

        StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(execEnv, bsSettings);
        logger.info("before local tz={}",tableEnv.getConfig().getLocalTimeZone().toString());


        String createTable = "n CREATE TABLE rxpower_detail (n" +
                "    actualTime BIGINT,n" +
                "    ponInfo ROW(PonRXPower INT),n" +
                "    deviceInfo ROW(deviceId STRING), n" +
                "    event_time AS TO_TIMESTAMP(FROM_UNIXTIME(actualTime/1000)) ,n" +
                // !!! the event time attribute which produced by
                // below TO_TIMESTAMP_LTZ function can't be used in OVER Aggregation, will throw
                // 'OVER windows' ordering in stream mode must be defined on a time attribute' Exception, it's maybe a bug.
                // !!!
//                "    event_time AS TO_TIMESTAMP_LTZ(actualTime,3) ,n" +
                "    WATERMARK FOR event_time AS event_time - INTERVAL '" +
                  MAX_OUT_OF_ORDERNESS.getSize() +
                "' " +
                MAX_OUT_OF_ORDERNESS.getUnit() + " , n" +
                " proc_time as PROCTIME() n" +
                "    )n" +
                " with ( n" +
                "'connector' = 'kafka'" + ",n" +
                "'topic' = '" + this.topic + "',n" +
                "'properties.bootstrap.servers' = '" + kafkaProps.getProperty("bootstrap.servers")+ "',n" +
                //"'properties.group.id' = '" + kafkaProps.getProperty("group.id")+ "',n" +
                "'scan.startup.mode' = 'latest-offset'" + ",n" +
                //"'scan.startup.mode' = 'group-offsets'" + ",n" +
                "'format' = 'json'" + ",n" +
                "'json.fail-on-missing-field' = 'false'" +",n" +
                "'json.ignore-parse-errors' = 'true'" +"n" +
                ")" +"n" ;

        logger.debug(createTable);
        tableEnv.executeSql(createTable);

        String selectSql = "";

        selectSql = ""
                + "ncreate temporary view tumble_windowed_result as "
                + "n select window_start as w_s,window_end as w_e,window_time as w_t,deviceId, count(deviceId) as lc "
                + "n from TABLE (TUMBLE(TABLE rxpower_detail,DEscriptOR(event_time), INTERVAL '5' SECOND))"
                + "n group by window_start,window_end, window_time,deviceId "
                + "n"
                ;

        logger.info(selectSql);

        tableEnv.executeSql(selectSql);
        selectSql = "" +
                "n select w_s, w_e, w_t,deviceId, lc, count(deviceId) over " +
                "n ( PARTITION BY deviceId " +
                "n order by w_t  " +
                "n range between INTERVAL '10' second PRECEDING AND CURRENT ROW )" +
                "n from tumble_windowed_result " +
                "n";
        logger.info(selectSql);


        Table table2 = tableEnv.sqlQuery(selectSql);

        DataStream resultStream = tableEnv.toDataStream(table2);


        resultStream.print();

        try {
            execEnv.execute("table api test");
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }

    }

    public static void main(String... args) {
        new PrintLowPowerDevicesByOverAggregationTableAPI("topic.periodic").run();
    }
}

运行上述代码,当使用

event_time AS TO_TIMESTAMP(FROM_UNIXTIME(actualTime/1000))

定义event_time字段时,能够正常运行并打印结果;

但如果换成

event_time AS TO_TIMESTAMP_LTZ(actualTime,3)

时,运行代码会抛出如下错误:

Exception in thread "main" org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:159)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodebase.translateToPlan(ExecNodebase.java:134)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:104)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodebase.translateToPlan(ExecNodebase.java:134)
	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:70)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
	at org.apache.flink.table.planner.delegation.Plannerbase.translate(Plannerbase.scala:165)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:434)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:358)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331)
	at com.nokia.itms.flink.sql.PrintLowPowerDevicesByOverAggregationTableAPI.run(PrintLowPowerDevicesByOverAggregationTableAPI.java:133)
	at com.nokia.itms.flink.sql.PrintLowPowerDevicesByOverAggregationTableAPI.main(PrintLowPowerDevicesByOverAggregationTableAPI.java:147)

根据错误日志查看源码如下:

        if (orderKeyType instanceof TimestampType
                && ((TimestampType) orderKeyType).getKind() == TimestampKind.ROWTIME) {
            rowTimeIdx = orderKey;
        } else if (orderKeyType instanceof LocalZonedTimestampType
                && ((LocalZonedTimestampType) orderKeyType).getKind() == TimestampKind.PROCTIME) {
            rowTimeIdx = -1;
        } else {
            throw new TableException(
                    "OVER windows' ordering in stream mode must be defined on a time attribute.");
        }

这里的逻辑判断有点晕,为什么不判断LocalZonedTimestampType的TimestampKind.RowTime条件,直接就抛异常了呢,没办法,只能改用TO_TIMESTAMP函数。

最后,这个问题在Flink1.14.0上虽然没有报错,但是仍然碰到在笔记三中的问题,程序根本没有运行,控制台没有任何输出信息。

EOF

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336175.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号