CEP【复合事件处理】 通过规则,从不同的事件流中找出相关的事件组合(发生事件),并对发现的做进一步处理(处理发生事件)。
CEP:先捕获各种细微的事件(基础事件或简单事件),然后通过分析整理【事件模式】,找出更有意义的事件(复合事件),最后决定采取什么行动。其中事件的分析整理以找出更有意义的事件,是CEP的核心,也是最困难的地方。 有关CEP概念的理解,请参考轻松理解CEP技术
CEP是一种基于动态环境中事件流的分析技术,事件在这里通常是指采集到的各种数据如交易记录,并且连续不间断。通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件模式,最终分析得到更复杂的复合事件。
1.2 FlinkCEPFlinkCEP(Complex event processing for Flink) 是基于Flink实现的复杂事件处理库.
在无界流或有界流中检测出事件模式【event pattern】,从而挖掘出数据的价值。
- 目标:从有序的简单事件流中发现复合事件【即定义的事件模式】
- 输入:一个或多个由简单事件构成的事件流【必须指定watermark,用于时序关系分析】
- 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
- 输出:满足规则的复杂事件
CEP的应用场景有很多,如股票曲线预测、网络入侵、物流订单追踪、电商订单、IOT场景等。
大体上分为如下三类:
- 风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。如当相同的银行卡在10分钟内,从两个不同的地方发生刷卡现象,就会触发报警机制,以便于监测信用卡盗刷等现象
- 策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
- 运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。
- 趋势分析
- 欺诈检测等
2.2. 开发流程org.apache.flink flink-cep_2.11 1.13.3
- 读取事件流并转换为DataStream
- 必须指定水印(watermark)
- 定义事件模式(event pattern)
- 在指定事件流上应用事件模式
- 匹配或选择符合条件的事件,并产生告警
部分代码片段参见如下:
//读取事件流 DataStreamSource2.3. 入门代码示例source = env.readTextFile("/data/input/events.txt"); DataStreamSource source = env.socketTextStream("bigdata01", 10088); SingleOutputStreamOperator flatMapStream = source.flatMap((FlatMapFunction ) (v, out) -> { out.collect(new Event(v.split(","))); }).returns(Types.POJO(Event.class));
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
public class MyCEPTest {
public static void main(String args[]) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
DataStream dataStream =
env.fromElements(
new EventMsg(1L, LocalDateTime.parse("2020-04-15 08:05:01", dateTimeFormatter), "A", "INFO"),
new EventMsg(2L, LocalDateTime.parse("2020-04-15 08:06:11", dateTimeFormatter), "A", "error"),
new EventMsg(3L, LocalDateTime.parse("2020-04-15 08:07:21", dateTimeFormatter), "A", "critical"),
new EventMsg(4L, LocalDateTime.parse("2020-04-15 08:08:21", dateTimeFormatter), "A", "INFO"),
new EventMsg(5L, LocalDateTime.parse("2020-04-15 08:09:21", dateTimeFormatter), "B", "INFO"),
new EventMsg(6L, LocalDateTime.parse("2020-04-15 08:11:51", dateTimeFormatter), "B", "error"),
new EventMsg(7L, LocalDateTime.parse("2020-04-15 08:12:20", dateTimeFormatter), "B", "critical"),
new EventMsg(8L, LocalDateTime.parse("2020-04-15 08:15:22", dateTimeFormatter), "B", "INFO"),
new EventMsg(9L, LocalDateTime.parse("2020-04-15 08:17:34", dateTimeFormatter), "B", "error"));
SingleOutputStreamOperator watermarks = dataStream.assignTimestampsAndWatermarks(
// 最大乱序程度
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
(SerializableTimestampAssigner) (element, recordTimestamp) -> toEpochMilli(element.getEventTime()))
);
Pattern pattern = Pattern.begin("start")
.next("middle").where(new SimpleCondition() {
@Override
public boolean filter(EventMsg value) throws Exception {
return value.getEventType().equals("error");
}
}).followedBy("end").where(new SimpleCondition() {
@Override
public boolean filter(EventMsg value) throws Exception {
return value.getEventType().equals("critical");
}
}).within(Time.seconds(180));
PatternStream patternStream = CEP.pattern(watermarks, pattern);
DataStream alerts = patternStream.select(new PatternSelectFunction() {
@Override
public String select(Map> msgs) throws Exception {
StringBuffer sb = new StringBuffer();
msgs.forEach((k,v)->{
sb.append(k+",");
sb.append(v.toString()+"n");
});
return sb.toString();
}
});
alerts.print();
env.execute("Flink CEP Test");
}
public static final ZoneOffset zoneOffset8 = ZoneOffset.of("+8");
public static long toEpochMilli(LocalDateTime dt) {
return dt.toInstant(zoneOffset8).toEpochMilli();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class EventMsg {
public long eventId;
public LocalDateTime eventTime;
public String eventName;
public String eventType;
@Override
public String toString(){
return String.format("%s-%s-%s-%s",eventId,eventName,eventType,eventTime);
}
}
}



