栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink 1.10.1 cep java版本实现复杂事件模式匹配

flink 1.10.1 cep java版本实现复杂事件模式匹配

本文的基础环境可以参考flink 1.10.1 java版本wordcount演示 (nc + socket),在此基础上增加cep实现复杂模式匹配测试。

1. 添加依赖

    org.apache.flink
    flink-cep_2.11
    1.10.1

这里的版本应保持与flink版本一致,如果版本不一致,可能会导致各种错误。

2. 程序代码
package com.demo.cep;

import com.demo.entity.DataModel;
import com.demo.entity.ResultModel;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;
import java.util.Map;

public class FlinkCEPDemo {

    public static void main(String[] args) throws Exception {

        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        //从文件中读取数据
        String inputPath = "data/testdata.txt";
        DataStream inputDataSet = env.readTextFile(inputPath);
        //对数据流进行处理转换
        DataStream dataStream = inputDataSet.map(new MapFunction() {
            @Override
            public DataModel map(String s) throws Exception {
                String splits[] = s.split(",");

                return new DataModel(splits[0], new Float(splits[1]),
                        new Float(splits[2]), new Float(splits[3]),
                        new Float(splits[4]));
            }
        });


        Pattern pattern = Pattern.begin("start").where(
                new SimpleCondition() {
                    @Override
                    public boolean filter(DataModel dataModel) throws Exception {
                        return dataModel.getPrice() > 4;
                    }
                })
                .times(1)
                .followedBy("middle").where(
                        new SimpleCondition() {
                            @Override
                            public boolean filter(DataModel dataModel) throws Exception {
                                return dataModel.getPrice() <= 3;
                            }
                        }).times(2, 4).greedy()
                .followedBy("end").where(
                        new IterativeCondition() {
                            @Override
                            public boolean filter(DataModel dataModel, Context ctx) throws Exception {

                                // 取得当前事件对象
                                double sum = dataModel.getPrice();

                                // 获取满足middle条件的事件对象
                                Iterable middle = ctx.getEventsForPattern("middle");

                                for (DataModel model : middle) {
                                    sum += model.getPrice();
                                }

                                return dataModel.getPrice() > 4;
                            }
                        });

        PatternStream patternStream = CEP.pattern(dataStream.keyBy(DataModel::getCode), pattern);

        SingleOutputStreamOperator select = patternStream.select(new MyPatternSelectFunction());

        select.print();

        env.execute();

    }

    public static class MyPatternSelectFunction implements PatternSelectFunction {
        @Override
        public com.demo.entity.ResultModel select(Map> pattern) {
            com.demo.entity.DataModel startEvent = pattern.get("start").get(0);
            com.demo.entity.DataModel endEvent = pattern.get("end").get(0);

            return new ResultModel(startEvent.getCode(), startEvent.getPrice(), endEvent.getPrice());
        }
    }

}

程序代码从大于4的数据开始,找到连续2到4个小于3的数据,尽可能匹配,然后再找到一个大于4的数据,则表示模式匹配完成。

3. 辅助代码
package com.demo.entity;

public class DataModel {

    private String code;
    private float price;
    private float high;
    private float low;
    private float open;

    public DataModel() {
    }

    public DataModel(String code, float price, float high, float low, float open) {
        this.code = code;
        this.price = price;
        this.high = high;
        this.low = low;
        this.open = open;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public float getPrice() {
        return price;
    }

    public void setPrice(float price) {
        this.price = price;
    }

    public float getHigh() {
        return high;
    }

    public void setHigh(float high) {
        this.high = high;
    }

    public float getLow() {
        return low;
    }

    public void setLow(float low) {
        this.low = low;
    }

    public float getOpen() {
        return open;
    }

    public void setOpen(float open) {
        this.open = open;
    }

    @Override
    public String toString() {
        return "DataModel{" +
                "code='" + code + ''' +
                ", price=" + price +
                ", high=" + high +
                ", low=" + low +
                ", open=" + open +
                '}';
    }
}
4. 结果类定义
package com.demo.entity;

public class ResultModel {

    private String code;
    private float startPrice;
    private float endPrice;

    public ResultModel() {
    }

    public ResultModel(String code, float startPrice, float endPrice) {
        this.code = code;
        this.startPrice = startPrice;
        this.endPrice = endPrice;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public float getStartPrice() {
        return startPrice;
    }

    public void setStartPrice(float startPrice) {
        this.startPrice = startPrice;
    }

    public float getEndPrice() {
        return endPrice;
    }

    public void setEndPrice(float endPrice) {
        this.endPrice = endPrice;
    }

    @Override
    public String toString() {
        return "ResultModel{" +
                "code='" + code + ''' +
                ", startPrice=" + startPrice +
                ", endPrice=" + endPrice +
                '}';
    }
}
5. 测试数据

data/testdata.txt

0000001,1,1,1,1
0000001,1,1,1,1
0000001,1,1,1,1
0000001,5,1,1,1
0000001,2,1,1,1
0000001,1,1,1,1
0000001,2,1,1,1
0000001,3,1,1,1
0000001,6,1,1,1
6. 启动程序,执行测试

可以看到找到匹配的数据,该数据有模式匹配的第一条数据和最后一条数据组合而成。

ResultModel{code='0000001', startPrice=5.0, endPrice=6.0}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/730466.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号