栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 1.14.2 CEP 没有输出

Flink 1.14.2 CEP 没有输出

背景

最近几天把Flink项目的版本从1.12升级到了最新的1.14.2,然后发现项目里的CEP事件都没有输出了,即使将Stream打印到控制台,也是啥也没有。

问题原因

Flink在1.12版本之后,PatternStream默认使用Event Time。如果业务使用的事Processing Time,必须要明确配置。

解决办法

样例代码,下面的代码是不会有任何输出的。

package spendreport;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;



public class CEPtest {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStream dataStream = env.addSource(new TransactionSource());

        Pattern pattern = Pattern.begin("begin", AfterMatchSkipStrategy.noSkip()).where(
                new SimpleCondition() {
                    @Override
                    public boolean filter(Transaction transaction) throws Exception {
                        return transaction.getAmount() > 300;
                    }
                }).timesOrMore(1);

        PatternStream patternStream = CEP.pattern(dataStream, pattern);
        DataStream d = patternStream.select(
                (PatternSelectFunction) map -> map.get("begin").get(0))
                .name("bbbb");

        d.print();

        env.execute();
    }
}

 

而下面的代码就正常了

package spendreport;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;



public class CEPtest {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStream dataStream = env.addSource(new TransactionSource());

        Pattern pattern = Pattern.begin("begin", AfterMatchSkipStrategy.noSkip()).where(
                new SimpleCondition() {
                    @Override
                    public boolean filter(Transaction transaction) throws Exception {
                        return transaction.getAmount() > 300;
                    }
                }).timesOrMore(1);

        PatternStream patternStream = CEP.pattern(dataStream, pattern).inProcessingTime();
        DataStream d = patternStream.select(
                (PatternSelectFunction) map -> map.get("begin").get(0))
                .name("bbbb");

        d.print();

        env.execute();
    }
}

 只需要在在后面加上inProcessingTime()或者inEventTime()即可。

PatternStream patternStream = CEP.pattern(dataStream, pattern).inProcessingTime();
总结

当代码能跑起来的时候,能别动就别动!

备注:示例中的数据源是Flink内置数据源,需要加载Maven pom:

        
			org.apache.flink
			flink-walkthrough-common_${scala.binary.version}
			${flink.version}
		

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/699341.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号