栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink CEP结合案例详解

Flink CEP结合案例详解

1、介绍

FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库。它允许您在无穷无尽的事件流中检测事件模式,使您有机会掌握数据中重要的内容。通常会用来做一些用户操作APP的日志风控策略等多种复杂事件,下面详细以用户连续10s内登陆失败超过3次告警为需求,进行全面讲解。

1.1、整体需求数据详解图

2、官方案例

官方代码案例如下:

DataStream input = ...

Pattern pattern = Pattern.begin("start").where(
        new SimpleCondition() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
    ).next("middle").subtype(SubEvent.class).where(
        new SimpleCondition() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        }
    ).followedBy("end").where(
         new SimpleCondition() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end");
            }
         }
    );

PatternStream patternStream = CEP.pattern(input, pattern);

DataStream result = patternStream.process(
    new PatternProcessFunction() {
        @Override
        public void processMatch(
                Map> pattern,
                Context ctx,
                Collector out) throws Exception {
            out.collect(createalertFrom(pattern));
        }
    });
2.1、官方案例总结

CEP编程步骤
a)定义模式序列

Pattern.begin("patternName").API...

基本都是按照如上的套路来新建自定义一个模式规则

后续的可以跟的API可以在官方中查看学习

Event Processing (CEP) | Apache Flink
b)将模式序列作用到流上

CEP.pattern(inputDataStream,pattern)

CEP.pattern()是固定格式写法,

其中第一个参数,表示需要具体作用的流;

第二个参数,表示具体的自定义的模式。
c)提取匹配上的数据和输出

由b)生成的流用process API来进行数据处理输出,继承PatternProcessFunction,重写processMatch(Map> pattern,Context ctx,Collector out)方法,

第一个参数,表示具体匹配上的数据,其中Map的key就是a)步骤中定义的"patternName"名称,value就是该名称具体对应规则匹配上的数据集;

第二个参数,表示没匹配上的数据侧输出流

第三个参数,表示具体该函数处理完,需要对外输出的内容收集。

3、需求案例详解

下面就以从Socket中模拟读取用户操作日志数据,来进行数据CEP匹配数据输出。

以如下代码把读进来的数据进行数据打平成JavaBean。该章节的讲解以代码段进行,后续章节会把demo代码全部贴出来。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


env.setParallelism(1);

DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);

SingleOutputStreamOperator dataStream = socketTextStream.flatMap(new MyFlatMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))
                                .withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.getLoginTime())
                );
3.1、使用begin.where.next.where.next
Pattern wherePatternOne = Pattern.begin("start").where(new SimpleCondition() {
            @Override
            public boolean filter(UserLoginLog value) throws Exception {
                return 1 == value.getLoginStatus();
            }
        }).next("second").where(new IterativeCondition() {
            @Override
            public boolean filter(UserLoginLog value, Context ctx) throws Exception {

                return 1 == value.getLoginStatus();
            }
        }).next("third").where(new SimpleCondition() {
            @Override
            public boolean filter(UserLoginLog value) throws Exception {

                return 1 == value.getLoginStatus();
            }
        }).within(Time.seconds(10));

如上根据设置判断登陆状态是否为失败开始计数,连续第二条,第三条如果也同样为失败的话,就会输出

//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11121

{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}

3.1.1需求输出图解

3.2、使用begin.times
Pattern wherePatternTwo = Pattern.begin("start").where(new IterativeCondition() {
            @Override
            public boolean filter(UserLoginLog value, Context ctx) throws Exception {
                return 1 == value.getLoginStatus();
            }
        }).times(3).within(Time.seconds(10));

如上根据设置判断登陆状态是否为失败开始计数,只要在10秒之内出现第二条,第三条如果也同样为失败的话,就会输出,该本质就是不需要连续出现。

//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11118、11119、11121

{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}

3.2.1、需求图解

3.3、使用begin.times.consecutive
Pattern wherePatternThree = Pattern.begin("start").where(new IterativeCondition() {
            @Override
            public boolean filter(UserLoginLog value, Context ctx) throws Exception {
                return 1 == value.getLoginStatus();
            }
        }).times(3).consecutive().within(Time.seconds(10));

如上在比3.2的基础上多加了一个consecutive之后,就变成跟3.1一样的效果

//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11121

{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}

4、本Demo所有代码 4.1、pom文件

        1.14.3
        2.7.5
        2.11
        2.4.0
        3.3.0
        1.18.6
        1.2.72
        1.8

    
    
        
            
            
                org.apache.hadoop
                hadoop-common
                ${hadoop.version}
                
                    
                        org.slf4j
                        slf4j-log4j12
                    
                    
                        log4j
                        log4j
                    
                    
                        org.slf4j
                        slf4j-api
                    
                
            
            
            
                org.apache.flink
                flink-clients_${scala.binary.version}
                ${flink.version}
                
                    
                        org.slf4j
                        slf4j-api
                    
                
            
            
                org.apache.flink
                flink-java
                ${flink.version}
                
                    
                        log4j
                        *
                    
                    
                        org.slf4j
                        slf4j-log4j12
                    
                
            
            
                org.apache.flink
                flink-streaming-java_${scala.binary.version}
                ${flink.version}
                
                    
                        log4j
                        *
                    
                    
                        org.slf4j
                        slf4j-log4j12
                    
                    
                        com.google.code.findbugs
                        jsr305
                    
                    
                        org.apache.flink
                        force-shading
                    
                
            
            
                org.apache.flink
                flink-statebackend-rocksdb_${scala.binary.version}
                ${flink.version}
                
                    
                        log4j
                        *
                    
                    
                        org.slf4j
                        slf4j-log4j12
                    
                
            
            
                org.apache.flink
                flink-runtime-web_2.11
                ${flink.version}
                provided
            
            
            
                org.apache.kafka
                kafka-clients
                ${kafka.version}
            
            
                org.apache.flink
                flink-connector-kafka_2.11
                ${flink.version}
                
                    
                        log4j
                        *
                    
                    
                        org.slf4j
                        slf4j-log4j12
                    
                
            
            
            
                redis.clients
                jedis
                ${redis.version}
            
            
            
                org.projectlombok
                lombok
                ${lombok.version}
                provided
            
            
                com.alibaba
                fastjson
                ${fastjson.verson}
            

            
                org.apache.flink
                flink-cep_2.11
                ${flink.version}
            

        
    
4.2、UserLoginLog类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
class UserLoginLog {
    
    private int loginId;
    
    private long loginTime;
    
    private int loginStatus;
    
    private String userName;
}
4.3、MyFlatMapFunction类
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

@Slf4j
public class MyFlatMapFunction implements FlatMapFunction {

    
    @Override
    public void flatMap(String value, Collector out) throws Exception {

        if (StringUtils.isNotBlank(value)) {
            UserLoginLog userLoginLog = JSONObject.parseObject(value, UserLoginLog.class);
            out.collect(userLoginLog);
        }


    }
}
4.4、MyPatternProcessFunction类
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;

@Slf4j
public class MyPatternProcessFunction extends PatternProcessFunction {
    
    @Override
    public void processMatch(Map> match, Context ctx, Collector out) throws Exception {

        List start = match.get("start");

        out.collect(start.get(0));
    }
}

4.4、主类

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;

@Slf4j
public class CepLearning {
    public static void main(String[] args) {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        
        env.setParallelism(1);

        DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator dataStream = socketTextStream.flatMap(new MyFlatMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))
                                .withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.getLoginTime())
                );


        
        Pattern wherePatternOne = Pattern.begin("start").where(new SimpleCondition() {
            @Override
            public boolean filter(UserLoginLog value) throws Exception {
                return 1 == value.getLoginStatus();
            }
        }).next("second").where(new IterativeCondition() {
            @Override
            public boolean filter(UserLoginLog value, Context ctx) throws Exception {

                return 1 == value.getLoginStatus();
            }
        }).next("third").where(new SimpleCondition() {
            @Override
            public boolean filter(UserLoginLog value) throws Exception {

                return 1 == value.getLoginStatus();
            }
        }).within(Time.seconds(10));

        
        Pattern wherePatternTwo = Pattern.begin("start").where(new IterativeCondition() {
            @Override
            public boolean filter(UserLoginLog value, Context ctx) throws Exception {
                return 1 == value.getLoginStatus();
            }
        }).times(3).within(Time.seconds(10));

        
        Pattern wherePatternThree = Pattern.begin("start").where(new IterativeCondition() {
            @Override
            public boolean filter(UserLoginLog value, Context ctx) throws Exception {
                return 1 == value.getLoginStatus();
            }
        }).times(3).consecutive().within(Time.seconds(10));


        PatternStream patternStream = CEP.pattern(dataStream, wherePatternOne);

        PatternStream patternStream1 = CEP.pattern(dataStream, wherePatternTwo);

        PatternStream patternStream2 = CEP.pattern(dataStream, wherePatternThree);


        SingleOutputStreamOperator process = patternStream.process(new MyPatternProcessFunction());
        SingleOutputStreamOperator process1 = patternStream1.process(new MyPatternProcessFunction());
        SingleOutputStreamOperator process2 = patternStream2.process(new MyPatternProcessFunction());

        process.print("resultOutPut");

        process1.print("resultOutPutTwo");

        process2.print("resultOutPutThree");

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746581.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号