最近几天把Flink项目的版本从1.12升级到了最新的1.14.2,然后发现项目里的CEP事件都没有输出了,即使将Stream打印到控制台,也是啥也没有。
问题原因Flink在1.12版本之后,PatternStream默认使用Event Time。如果业务使用的事Processing Time,必须要明确配置。
解决办法样例代码,下面的代码是不会有任何输出的。
package spendreport;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class CEPtest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
DataStream dataStream = env.addSource(new TransactionSource());
Pattern pattern = Pattern.begin("begin", AfterMatchSkipStrategy.noSkip()).where(
new SimpleCondition() {
@Override
public boolean filter(Transaction transaction) throws Exception {
return transaction.getAmount() > 300;
}
}).timesOrMore(1);
PatternStream patternStream = CEP.pattern(dataStream, pattern);
DataStream d = patternStream.select(
(PatternSelectFunction) map -> map.get("begin").get(0))
.name("bbbb");
d.print();
env.execute();
}
}
而下面的代码就正常了
package spendreport;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class CEPtest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
DataStream dataStream = env.addSource(new TransactionSource());
Pattern pattern = Pattern.begin("begin", AfterMatchSkipStrategy.noSkip()).where(
new SimpleCondition() {
@Override
public boolean filter(Transaction transaction) throws Exception {
return transaction.getAmount() > 300;
}
}).timesOrMore(1);
PatternStream patternStream = CEP.pattern(dataStream, pattern).inProcessingTime();
DataStream d = patternStream.select(
(PatternSelectFunction) map -> map.get("begin").get(0))
.name("bbbb");
d.print();
env.execute();
}
}
只需要在在后面加上inProcessingTime()或者inEventTime()即可。
PatternStream总结patternStream = CEP.pattern(dataStream, pattern).inProcessingTime();
当代码能跑起来的时候,能别动就别动!
备注:示例中的数据源是Flink内置数据源,需要加载Maven pom:
org.apache.flink flink-walkthrough-common_${scala.binary.version}${flink.version}



