Mavn的依赖
1.8
${java.version}
${java.version}
1.12.0
2.12
3.1.3
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-java_${scala.version}
${flink.version}
org.apache.flink
flink-connector-kafka_${scala.version}
${flink.version}
org.apache.flink
flink-clients_${scala.version}
${flink.version}
org.apache.flink
flink-cep_${scala.version}
${flink.version}
org.apache.flink
flink-json
${flink.version}
com.alibaba
fastjson
1.2.68
org.apache.hadoop
hadoop-client
${hadoop.version}
mysql
mysql-connector-java
5.1.49
com.alibaba.ververica
flink-connector-mysql-cdc
1.2.0
org.slf4j
slf4j-api
1.7.25
org.slf4j
slf4j-log4j12
1.7.25
org.apache.logging.log4j
log4j-to-slf4j
2.14.0
org.projectlombok
lombok
1.18.12
org.apache.flink
flink-connector-jdbc_${scala.version}
${flink.version}
org.apache.phoenix
phoenix-spark
5.0.0-Hbase-2.0
org.glassfish
javax.el
commons-beanutils
commons-beanutils
1.9.3
com.google.guava
guava
29.0-jre
redis.clients
jedis
3.3.0
ru.yandex.clickhouse
clickhouse-jdbc
0.2.4
com.fasterxml.jackson.core
jackson-databind
com.fasterxml.jackson.core
jackson-core
org.apache.flink
flink-table-api-java-bridge_${scala.version}
${flink.version}
org.apache.flink
flink-table-planner-blink_${scala.version}
${flink.version}
com.janeluo
ikanalyzer
2012_u6
com.clickhouse
clickhouse-jdbc
0.3.2-patch4
http
*
*
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
jar-with-dependencies
make-assembly
package
single
核心代码
//流代码
EnvironmentSettings environment = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, environment);
//dataStream.print();
//以一个的分钟作为周期
SingleOutputStreamOperator> streamOperator = dataStreams.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable iterable, Collector> collector) throws Exception {
ArrayList list = Lists.newArrayList(iterable);
if (list.size() > 0) {
collector.collect(list);
}
}
});
//dataStreams.print();
Table table = streamTableEnvironment.fromDataStream(dataStreams, "user_id,item_id,cate_id,times,name,keyword,factory,price,pro,city,par,brank");
streamTableEnvironment.createTemporaryView("t1", table);
streamOperator.addSink(new OrderSinkFunc());
//,tumble(times, interval '1' day)
Table table1 = streamTableEnvironment.sqlQuery("select item_id,name,count(*)as num ,sum(price) as total from t1 group by item_id,name ");
//支持撤回
streamTableEnvironment.toRetractStream(table1, Row.class).print("输出结果");