- 参考地址
- 基本概念
- pom 依赖
- code
- 数据源
- LoginEvent 实体类
- 业务逻辑代码
- 运行效果图
flink cep 官网链接
基本概念FlinkCEP是在Flink上层实现的复杂事件处理库。 它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分。 本页讲述了Flink CEP中可用的API,我们首先讲述模式API,它可以让你指定想在数据流中检测的模式,然后讲述如何检测匹配的事件序列并进行处理。 再然后我们讲述Flink在按照事件时间处理迟到事件时的假设, 以及如何从旧版本的Flink向1.3之后的版本迁移作业。pom 依赖
code 数据源org.apache.flink flink-cep_2.11 1.14.0 org.apache.flink flink-cep-scala_2.11 1.14.0
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; public class LoginSource extends RichSourceFunctionLoginEvent 实体类{ List loginStatusList; List userIdList; List userNameList; Random random; Boolean isRunning; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); loginStatusList = Arrays.asList("failed", "failed"); // loginStatusList = Arrays.asList("success", "failed"); userIdList = Arrays.asList(1, 2, 3, 4, 5); userNameList = Arrays.asList("zhangsan", "lisi", "wangwu", "maliu", "yanqi"); random = new Random(); isRunning = true; } @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { Long currentTimeStamp = System.currentTimeMillis(); final int index = random.nextInt(5); int id = userIdList.get(index); String userName = userNameList.get(index); final int statusIndex = random.nextInt(2); final String status = loginStatusList.get(statusIndex); ctx.collect(new LoginEvent(id, userName, status, currentTimeStamp)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isRunning = false; loginStatusList = new ArrayList<>(); userIdList = new ArrayList<>(); userNameList = new ArrayList<>(); } }
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class LoginEvent {
private int userId;
private String userName;
private String loginStatus;
private Long loginTime;
}
业务逻辑代码
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.List;
import java.util.Map;
public class FlinkCEPLoginEvent {
public static void main(String[] args) throws Exception{
//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000L);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//添加数据源
final DataStreamSource loginEventDataStreamSource = env.addSource(new LoginSource());
final SingleOutputStreamOperator loginEventSingleOutputStreamOperator = loginEventDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.getLoginTime()));
final KeyedStream loginEventIntegerKeyedStream = loginEventSingleOutputStreamOperator.keyBy(new KeySelector() {
@Override
public Integer getKey(LoginEvent value) throws Exception {
return value.getUserId();
}
});
final Pattern pattern = Pattern.begin("start").where(new SimpleCondition() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return value.getLoginStatus().equals("failed");
}
}).next("middle").where(new SimpleCondition() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return value.getLoginStatus().equals("failed");
}
}).within(Time.seconds(3));
final PatternStream patternStream = CEP.pattern(loginEventIntegerKeyedStream, pattern);
loginEventIntegerKeyedStream.print("original: ");
final SingleOutputStreamOperator afterPDs = patternStream.process(new PatternProcessFunction() {
@Override
public void processMatch(Map> map, Context context, Collector collector) throws Exception {
System.out.println(map.toString());
final LoginEvent start = map.get("start").get(0);
final LoginEvent middle = map.get("middle").get(0);
collector.collect(String.format("{%s} login failed, 1st timeStamp: %s, 2nd: %s.", start.getUserName(), start.getLoginTime(), middle.getLoginTime()));
}
});
// loginEventDataStreamSource.print();
afterPDs.print("afterPDs: ");
System.out.println(env.getExecutionPlan());
env.execute("Test flink CEP");
}
}
运行效果图



