更多内容请访问:bbhhhh.github.io
使用Flink1.14.0 TableAPI对EventTime进行Window操作时碰到的问题最近有空又开始继续研究Flink了,直接上最新的稳定版1.14.0,没想到出师不利,在使用TableAPI进行Window聚合操作时碰到以下问题:
如果用EventTime进行Window操作,转换成DataStream后调用print()方法,控制台没有任何输出;但改成ProcessingTime进行Window操作却一切正常。
使用DataStream API对EventTime进行Window操作也是正常的,百思不得其解。
核心pom如下:
org.apache.flink flink-java 1.14.0 org.apache.flink flink-streaming-java_2.12 1.14.0 org.apache.flink flink-clients_2.12 1.14.0 org.apache.flink flink-connector-kafka_2.12 1.14.0 org.apache.flink flink-json 1.14.0 org.apache.flink flink-table-api-java-bridge_2.12 1.14.0 org.apache.flink flink-table-planner_2.12 1.14.0 org.apache.flink flink-streaming-scala_2.12 1.14.0 org.apache.flink flink-table-common 1.14.0
测试代码如下:
package com.nokia.itms.flink.sql;
import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nokia.itms.flink.FlinkConsumer;
import com.nokia.itms.kafka.KafkaConfig;
public class CountEventsPerDeviceByTumbleWindowTableAPI extends FlinkConsumer {
private static final Logger logger = LoggerFactory.getLogger(CountEventsPerDeviceByTumbleWindowTableAPI.class);
public CountEventsPerDeviceByTumbleWindowTableAPI(String topic) {
super(topic);
}
@Override
public void run() {
Properties kafkaProps = (Properties) KafkaConfig.getInstance().getKafkaProps().clone();
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.getClass().getSimpleName());
logger.info("Event max out of orderness = {} {}",MAX_OUT_OF_ORDERNESS.getSize(),MAX_OUT_OF_ORDERNESS.getUnit());
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(execEnv, bsSettings);
String createTable = "n CREATE TABLE rxpower_detail (n" +
" actualTime BIGINT,n" +
" ponInfo ROW(PonRXPower INT),n" +
" deviceInfo ROW(deviceId STRING), n" +
" event_time AS TO_TIMESTAMP(FROM_UNIXTIME(actualTime/1000,'yyyy-MM-dd HH:mm:ss')) ,n" +
// " event_time AS TO_TIMESTAMP_LTZ(actualTime,3) ,n" +
" WATERMARK FOR event_time AS event_time - INTERVAL '" +
MAX_OUT_OF_ORDERNESS.getSize() +
"' " +
MAX_OUT_OF_ORDERNESS.getUnit() + " , n" +
" proc_time as PROCTIME() n" +
" )n" +
" with ( n" +
"'connector' = 'kafka'" + ",n" +
"'topic' = '" + this.topic + "',n" +
"'properties.bootstrap.servers' = '" + kafkaProps.getProperty("bootstrap.servers")+ "',n" +
//"'properties.group.id' = '" + kafkaProps.getProperty("group.id")+ "',n" +
"'scan.startup.mode' = 'latest-offset'" + ",n" +
//"'scan.startup.mode' = 'group-offsets'" + ",n" +
"'format' = 'json'" + ",n" +
"'json.fail-on-missing-field' = 'false'" +",n" +
"'json.ignore-parse-errors' = 'true'" +"n" +
")" +"n" ;
logger.debug(createTable);
tableEnv.executeSql(createTable);
String selectSql = "";
selectSql = ""
+ "ncreate temporary view tumble_windowed_result as "
+ "n select window_start as w_s,window_end as w_e, window_time as w_t, deviceId, count(deviceId) as lc "
+ "n from TABLE (TUMBLE(TABLE rxpower_detail,DEscriptOR(event_time), INTERVAL '5' SECOND))"
// + " from TABLE (TUMBLE(TABLE rxpower_detail,DEscriptOR(proc_time), INTERVAL '5' SECOND))"
+ "n group by window_start,window_end,window_time, deviceId "
+"n"
;
logger.info(selectSql);
tableEnv.executeSql(selectSql);
//
selectSql = "select * from tumble_windowed_result " ;
logger.info(selectSql);
Table table2 = tableEnv.sqlQuery(selectSql);
DataStream resultStream = tableEnv.toDataStream(table2);
resultStream.print();
try {
execEnv.execute("table api test");
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
public static void main(String... args) {
new CountEventsPerDeviceByTumbleWindowTableAPI("topic.periodic").run();
}
}
运行上述代码,模拟向Kafka发送测试数据,控制台没有任何输出,但如果将上面的
from TABLE (TUMBLE(TABLE rxpower_detail,DEscriptOR(event_time), INTERVAL '5' SECOND))
换成Processing Time字段,改成
from TABLE (TUMBLE(TABLE rxpower_detail,DEscriptOR(proc_time), INTERVAL '5' SECOND))
就一切正常了,控制台能正常输出统计数据。
另外,如果将Flink版本换成1.13.2,并使用Blink table planner,不管是Event Time还是ProcessingTime又都是正常的。换成1.13.2后的核心pom如下:
org.apache.flink flink-java 1.13.2 org.apache.flink flink-streaming-java_2.12 1.13.2 org.apache.flink flink-clients_2.12 1.13.2 org.apache.flink flink-connector-kafka_2.12 1.13.2 org.apache.flink flink-json 1.13.2 org.apache.flink flink-table-api-java-bridge_2.12 1.13.2 org.apache.flink flink-table-planner_2.12 1.13.2 org.apache.flink flink-streaming-scala_2.12 1.13.2 org.apache.flink flink-table-planner-blink_2.12 1.13.2 org.apache.flink flink-table-common 1.13.2
这个问题也不知是不是Flink1.14.0版本的问题。
EOF



