FlinkCEP是在Flink之上实现的复杂事件处理(CEP)库。它允许您在无穷无尽的事件流中检测事件模式,使您有机会掌握数据中重要的内容。通常会用来做一些用户操作APP的日志风控策略等多种复杂事件,下面详细以用户连续10s内登陆失败超过3次告警为需求,进行全面讲解。
1.1、整体需求数据详解图 2、官方案例官方代码案例如下:
DataStream2.1、官方案例总结input = ... Pattern pattern = Pattern. begin("start").where( new SimpleCondition () { @Override public boolean filter(Event event) { return event.getId() == 42; } } ).next("middle").subtype(SubEvent.class).where( new SimpleCondition () { @Override public boolean filter(SubEvent subEvent) { return subEvent.getVolume() >= 10.0; } } ).followedBy("end").where( new SimpleCondition () { @Override public boolean filter(Event event) { return event.getName().equals("end"); } } ); PatternStream patternStream = CEP.pattern(input, pattern); DataStream result = patternStream.process( new PatternProcessFunction () { @Override public void processMatch( Map > pattern, Context ctx, Collector out) throws Exception { out.collect(createalertFrom(pattern)); } });
CEP编程步骤
a)定义模式序列
Pattern.
基本都是按照如上的套路来新建自定义一个模式规则
后续的可以跟的API可以在官方中查看学习
Event Processing (CEP) | Apache Flink
b)将模式序列作用到流上
CEP.pattern(inputDataStream,pattern)
CEP.pattern()是固定格式写法,
其中第一个参数,表示需要具体作用的流;
第二个参数,表示具体的自定义的模式。
c)提取匹配上的数据和输出
由b)生成的流用process API来进行数据处理输出,继承PatternProcessFunction,重写processMatch(Map
第一个参数,表示具体匹配上的数据,其中Map的key就是a)步骤中定义的"patternName"名称,value就是该名称具体对应规则匹配上的数据集;
第二个参数,表示没匹配上的数据侧输出流
第三个参数,表示具体该函数处理完,需要对外输出的内容收集。
3、需求案例详解下面就以从Socket中模拟读取用户操作日志数据,来进行数据CEP匹配数据输出。
以如下代码把读进来的数据进行数据打平成JavaBean。该章节的讲解以代码段进行,后续章节会把demo代码全部贴出来。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource3.1、使用begin.where.next.where.nextsocketTextStream = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator dataStream = socketTextStream.flatMap(new MyFlatMapFunction()) .assignTimestampsAndWatermarks( WatermarkStrategy. forBoundedOutOfOrderness(Duration.ofSeconds(1)) .withTimestampAssigner((SerializableTimestampAssigner ) (element, recordTimestamp) -> element.getLoginTime()) );
PatternwherePatternOne = Pattern. begin("start").where(new SimpleCondition () { @Override public boolean filter(UserLoginLog value) throws Exception { return 1 == value.getLoginStatus(); } }).next("second").where(new IterativeCondition () { @Override public boolean filter(UserLoginLog value, Context ctx) throws Exception { return 1 == value.getLoginStatus(); } }).next("third").where(new SimpleCondition () { @Override public boolean filter(UserLoginLog value) throws Exception { return 1 == value.getLoginStatus(); } }).within(Time.seconds(10));
如上根据设置判断登陆状态是否为失败开始计数,连续第二条,第三条如果也同样为失败的话,就会输出
3.1.1需求输出图解 3.2、使用begin.times//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11121
{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}
PatternwherePatternTwo = Pattern. begin("start").where(new IterativeCondition () { @Override public boolean filter(UserLoginLog value, Context ctx) throws Exception { return 1 == value.getLoginStatus(); } }).times(3).within(Time.seconds(10));
如上根据设置判断登陆状态是否为失败开始计数,只要在10秒之内出现第二条,第三条如果也同样为失败的话,就会输出,该本质就是不需要连续出现。
3.2.1、需求图解 3.3、使用begin.times.consecutive//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11118、11119、11121
{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}
PatternwherePatternThree = Pattern. begin("start").where(new IterativeCondition () { @Override public boolean filter(UserLoginLog value, Context ctx) throws Exception { return 1 == value.getLoginStatus(); } }).times(3).consecutive().within(Time.seconds(10));
如上在比3.2的基础上多加了一个consecutive之后,就变成跟3.1一样的效果
4、本Demo所有代码 4.1、pom文件//如下日志数据输入,最终将输出loginId为:11111、11112、11113、11116、11117、11121
{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"aaron"}
{"loginId":11112,"loginTime":1645177353000,"loginStatus":1,"userName":"aaron"}
{"loginId":11113,"loginTime":1645177354000,"loginStatus":1,"userName":"aaron"}
{"loginId":11116,"loginTime":1645177355000,"loginStatus":1,"userName":"aaron"}
{"loginId":11117,"loginTime":1645177356000,"loginStatus":1,"userName":"aaron"}
{"loginId":11118,"loginTime":1645177357000,"loginStatus":1,"userName":"aaron"}
{"loginId":11119,"loginTime":1645177358000,"loginStatus":1,"userName":"aaron"}
{"loginId":11120,"loginTime":1645177359000,"loginStatus":0,"userName":"aaron"}
{"loginId":11121,"loginTime":1645177360000,"loginStatus":1,"userName":"aaron"}
{"loginId":11122,"loginTime":1645177361000,"loginStatus":1,"userName":"aaron"}
{"loginId":11123,"loginTime":1645177362000,"loginStatus":1,"userName":"aaron"}
4.2、UserLoginLog类1.14.3 2.7.5 2.11 2.4.0 3.3.0 1.18.6 1.2.72 1.8 org.apache.hadoop hadoop-common${hadoop.version} org.slf4j slf4j-log4j12log4j log4jorg.slf4j slf4j-apiorg.apache.flink flink-clients_${scala.binary.version}${flink.version} org.slf4j slf4j-apiorg.apache.flink flink-java${flink.version} log4j *org.slf4j slf4j-log4j12org.apache.flink flink-streaming-java_${scala.binary.version}${flink.version} log4j *org.slf4j slf4j-log4j12com.google.code.findbugs jsr305org.apache.flink force-shadingorg.apache.flink flink-statebackend-rocksdb_${scala.binary.version}${flink.version} log4j *org.slf4j slf4j-log4j12org.apache.flink flink-runtime-web_2.11${flink.version} provided org.apache.kafka kafka-clients${kafka.version} org.apache.flink flink-connector-kafka_2.11${flink.version} log4j *org.slf4j slf4j-log4j12redis.clients jedis${redis.version} org.projectlombok lombok${lombok.version} provided com.alibaba fastjson${fastjson.verson} org.apache.flink flink-cep_2.11${flink.version}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
class UserLoginLog {
private int loginId;
private long loginTime;
private int loginStatus;
private String userName;
}
4.3、MyFlatMapFunction类
import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; @Slf4j public class MyFlatMapFunction implements FlatMapFunction4.4、MyPatternProcessFunction类{ @Override public void flatMap(String value, Collector out) throws Exception { if (StringUtils.isNotBlank(value)) { UserLoginLog userLoginLog = JSONObject.parseObject(value, UserLoginLog.class); out.collect(userLoginLog); } } }
import lombok.extern.slf4j.Slf4j; import org.apache.flink.cep.functions.PatternProcessFunction; import org.apache.flink.util.Collector; import java.util.List; import java.util.Map; @Slf4j public class MyPatternProcessFunction extends PatternProcessFunction{ @Override public void processMatch(Map > match, Context ctx, Collector out) throws Exception { List start = match.get("start"); out.collect(start.get(0)); } }
4.4、主类
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
@Slf4j
public class CepLearning {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator dataStream = socketTextStream.flatMap(new MyFlatMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.getLoginTime())
);
Pattern wherePatternOne = Pattern.begin("start").where(new SimpleCondition() {
@Override
public boolean filter(UserLoginLog value) throws Exception {
return 1 == value.getLoginStatus();
}
}).next("second").where(new IterativeCondition() {
@Override
public boolean filter(UserLoginLog value, Context ctx) throws Exception {
return 1 == value.getLoginStatus();
}
}).next("third").where(new SimpleCondition() {
@Override
public boolean filter(UserLoginLog value) throws Exception {
return 1 == value.getLoginStatus();
}
}).within(Time.seconds(10));
Pattern wherePatternTwo = Pattern.begin("start").where(new IterativeCondition() {
@Override
public boolean filter(UserLoginLog value, Context ctx) throws Exception {
return 1 == value.getLoginStatus();
}
}).times(3).within(Time.seconds(10));
Pattern wherePatternThree = Pattern.begin("start").where(new IterativeCondition() {
@Override
public boolean filter(UserLoginLog value, Context ctx) throws Exception {
return 1 == value.getLoginStatus();
}
}).times(3).consecutive().within(Time.seconds(10));
PatternStream patternStream = CEP.pattern(dataStream, wherePatternOne);
PatternStream patternStream1 = CEP.pattern(dataStream, wherePatternTwo);
PatternStream patternStream2 = CEP.pattern(dataStream, wherePatternThree);
SingleOutputStreamOperator process = patternStream.process(new MyPatternProcessFunction());
SingleOutputStreamOperator process1 = patternStream1.process(new MyPatternProcessFunction());
SingleOutputStreamOperator process2 = patternStream2.process(new MyPatternProcessFunction());
process.print("resultOutPut");
process1.print("resultOutPutTwo");
process2.print("resultOutPutThree");
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}



