目录
Flink CEP 概念以及使用场景
1.什么是CEP?
2.Flink CEP 应用场景
3.Flink CEP 原理(只做简单了解)
规则条件遵循参考
简单规则:
相对复杂规则:
CEP支持的模式
个体模式
模式序列
groovy+aviator的介绍
groovy是什么?
Aviator是什么?
groovy+aviator+cep的整合使用
正常cep代码开发流程
cep动态模板
cep动态模板+动态规则修改
运行演示
集群运行效果
具体代码
flink_cep_groovy_aviator项目
flink_cep_groovy_aviator2项目
相关代码
参考文章
最后
Flink CEP 概念以及使用场景
1.什么是CEP?
CEP的意思是复杂事件处理,例如:起床-->洗漱-->吃饭-->上班等一系列串联起来的事件流形成的模式称为 CEP。如果发现某一次起床后没有刷牙洗脸亦或是吃饭就直接上班,就可以把这种非正常的事件流匹配出来进行分析,看看今天是不是起晚了。
在真实场景中,起床,洗漱,吃饭,上班就是一个个事件数据。
CEP的特征如下:
目标:从有序的简单事件流中发现一些高阶特征;
输入:一个或多个简单事件构成的事件流;
处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件;
输出:满足规则的复杂事件。
下图中列出了几个例子,三个场景:
- 第一个是异常行为检测的例子:假设车辆维修的场景中,当一辆车出现故障时,这辆车会被送往维修点维修,然后被重新投放到市场运行。如果这辆车被投放到市场之后还未被使用就又被报障了,那么就有可能之前的维修是无效的。
- 第二个是策略营销的例子:假设打车的场景中,用户在 APP 上规划了一个行程订单,如果这个行程在下单之后超过一定的时间还没有被司机接单的话,那么就需要将这个订单输出到下游做相关的策略调整。
- 第三个是运维监控的例子:通常运维会监控服务器的 CPU、网络 IO 等指标超过阈值时产生相应的告警。但是在实际使用中,后台服务的重启、网络抖动等情况都会造成瞬间的流量毛刺,对非关键链路可以忽略这些毛刺而只对频繁发生的异常进行告警以减少误报。
2.Flink CEP 应用场景
- 风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
- 策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
- 运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。
3.Flink CEP 原理(只做简单了解)
- Flink CEP内部是用NFA(非确定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为起始状态、中间状态、最终状态三种,边分为 take、ignore、proceed 三种。
- take:必须存在一个条件判断,当到来的消息满足 take 边条件判断时,把这个消息放入结果集,将状态转移到下一状态。
- ignore:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。
- proceed:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。举个例子,当用户购买商品时,如果购买前有一个咨询客服的行为,需要把咨询客服行为和购买行为两个消息一起放到结果集中向下游输出;如果购买前没有咨询客服的行为,只需把购买行为放到结果集中向下游输出就可以了。 也就是说,如果有咨询客服的行为,就存在咨询客服状态的上的消息保存,如果没有咨询客服的行为,就不存在咨询客服状态的上的消息保存,咨询客服状态是由一条 proceed 边和下游的购买状态相连。
下面以一个打车的例子来展示状态是如何流转的,规则见下图所示。
以乘客制定行程作为开始,匹配乘客的下单事件,如果这个订单超时还没有被司机接单的话,就把行程事件和下单事件作为结果集往下游输出。
假如消息到来顺序为:行程-->其他-->下单-->其他。
状态流转如下:
1)开始时状态处于行程状态,即等待用户制定行程。
2)当收到行程事件时,匹配行程状态的条件,把行程事件放到结果集中,通过 take 边将状态往下转移到下单状态。
3)由于下单状态上有一条 ignore 边,所以可以忽略收到的其他事件,直到收到下单事件时将其匹配,放入结果集中,并且将当前状态往下转移到超时未接单状态。这时候结果集当中有两个事件:制定行程事件和下单事件。
4)超时未接单状态时,如果来了一些其他事件,同样可以被 ignore 边忽略,直到超时事件的触发,将状态往下转移到最终状态,这时候整个模式匹配成功,最终将结果集中的制定行程事件和下单事件输出到下游。
上面是一个匹配成功的例子,如果是不成功的例子会怎么样?
假如当状态处于超时未接单状态时,收到了一个接单事件,那么就不符合超时未被接单的触发条件,此时整个模式匹配失败,之前放入结果集中的行程事件和下单事件会被清理。
规则条件遵循参考
简单规则:
同一天同一设备IP变更次数 大于 XX 次
设备两次登录间隔小于 XX 秒
相对复杂规则:
同一账户,在XX分钟内转成金额 XX元后,又在XX分钟内转出累加金额大于 XXX元
CEP支持的模式
个体模式
包括单例模式和循环模式。单例模式只接收一个事件,而循环模式可以接收多个事件。
(1)量词
可以在一个个体模式后追加量词,也就是指定循环次数
// 匹配出现4次
start.time(4)
// 匹配出现0次或4次
start.time(4).optional
// 匹配出现2、3或4次
start.time(2,4)
// 匹配出现2、3或4次,并且尽可能多地重复匹配
start.time(2,4).greedy
// 匹配出现1次或多次
start.oneOrMore
// 匹配出现0、2或多次,并且尽可能多地重复匹配
start.timesOrMore(2).optional.greedy
(2)条件
每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据。CEP中的个体模式主要通过调用.where()、.or()和.until()来指定条件。按不同的调用方式,可以分成以下几类:
① 简单条件
通过.where()方法对事件中的字段进行判断筛选,决定是否接收该事件
start.where(event=>event.getName.startsWith(“foo”))
② 组合条件
将简单的条件进行合并;or()方法表示或逻辑相连,where的直接组合就相当于与and。
Pattern.where(event => …).or(event => )
③ 终止条件
如果使用了oneOrMore或者oneOrMore.optional,建议使用.until()作为终止条件,以便清理状态。
④ 迭代条件
能够对模式之前所有接收的事件进行处理;调用.where((value,ctx) => {…}),可以调用ctx.getEventForPattern(“name”)
模式序列
序列模式可以接受多个事件,这在我们下面讲解的案例里面可以看到。
(1)严格近邻
所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由.next()指定。例如对于模式“a next b”,事件序列“a,c,b1,b2”没有匹配。
(2)宽松近邻
允许中间出现不匹配的事件,由.followedBy()指定。例如对于模式“a followedBy b”,事件序列“a,c,b1,b2”匹配为{a,b1}。
(3)非确定性宽松近邻
进一步放宽条件,之前已经匹配过的事件也可以再次使用,由.followedByAny()指定。例如对于模式“a followedByAny b”,事件序列“a,c,b1,b2”匹配为{ab1},{a,b2}。
除了以上模式序列外,还可以定义“不希望出现某种近邻关系”:
.notNext():不想让某个事件严格紧邻前一个事件发生。
.notFollowedBy():不想让某个事件在两个事件之间发生。
需要注意:
①所有模式序列必须以.begin()开始;
②模式序列不能以.notFollowedBy()结束;
③“not”类型的模式不能被optional所修饰;
④可以为模式指定时间约束,用来要求在多长时间内匹配有效。
next.within(Time.seconds(10))
groovy+aviator的介绍
groovy是什么?
Groovy是Java虚拟机的敏捷和动态语言, 提供静态类型检查的能力,并静态地编译成java字节码,以获得健壮性和性能,与所有现有的Java类和库无缝集成,可以在任何可以使用java的地方使用它。
简单解释,就是可以将字符串,静态类型编译成自己需要的java对象或者方法。
groovy是实现动态模板的关键。
案例说明:
Aviator是什么?
Google Aviator是一个高性能、轻量级的 java 语言实现的表达式求值引擎, 主要用于各种表达式的动态求值,aviator支持大部分运算操作符, 包括算术操作符、关系运算符、逻辑操作符、位运算符、正则匹配操作符(=~)、三元表达式(并且支持操作符的优先级和括号强制优先级, 具体请看后面的操作符列表, 支持自定义函数等。
简单解释,就是起到类型正则匹配的效果。
Aviator是实现条件匹配的关键。
案例说明:
了解相关链接跟案例:
google aviator:轻量级Java公式引擎_刘本龙的专栏-CSDN博客_java 公式引擎
groovy+aviator+cep的整合使用
正常cep代码开发流程
一,数据源
二,单场景
匹配结果:
三,多场景
匹配结果:
四,真实业务场景演示代码
代码类:
flink_cep_groovy_aviator –-> com.sjb.test.yarn.dev.CepOnYarnDev
数据源kafka:
flink_cep
数据格式:
{
"operation_type": "24",
"order_number": "2104081206210050",
"driver_code": "100000609882",
"device_type": "8",
"kafka_topic": "test_order_flow",
"canal_ts": "1617853408000",
"update_time": "1617853408436",
"orderflow_code": "dedc67e9dc55461b99da1548fd215c73",
"device_code": "oq6JXwo9KwqQ8jOPvlObJReUpZp4",
"operation_type_desc": "杜岳飞卸货地签到",
"company_code": "410307014961",
"company_member_code": "200000611452",
"create_date": "1617853408431",
"app_code": "100002"
}
数据结果写入的是hbase:
cep_result
查看结果:
Scan ‘cep_result’
cep动态模板
1, 什么是cep动态模板?
在上面的演示中我们看到,条件是固定的情况下,代码是固定写死的,那么我们有多少个任务,就需要开发出多少个代码类,然后编译打包上传运行。
为了业务的灵活需要,需要开发出一套代码,一个包,针对不同的条件提交任务运行。
2, 动态cep模板原理
通过解析参数,拼接代码,再加上groovy+avritor动态生成cep代码,来实现cep动态模板。
3, 案例演示
代码类:
flink_cep_groovy_aviator --> com.sjb.test.yarn.FlinkDynamicCepDemo_2021_04_07_003
cep动态模板+动态规则修改
动态规则修改实现:
修改了cep的源码,添加了一个pattern对象修改功能,核心代码
执行流程:
1,读取mysql的规则数据
2,接入广播流,监控规则数据是否修改
3,动态模板生产cep代码,如果有规则修改,重新生成cep代码
4,执行运行。
代码整合:略
运行演示
flink_cep_groovy_aviator ---> test.BroadcastCepDemo3
单条件(对应mysql表 id =3):
flink_cep_groovy_aviator2 ---> dev.BroadcastCepDev1
{
"rule_id": 1,
"title": "一个条件告警",
"label_content": [{
"condition_name": "begin",
"times": "timesOrMore(3)",
"within": "within(Time.days(1L))",
"sequence": "",
"description": "第一个条件",
"condition_value": "operation_type=='1'"
}],
"select": [{
"condition_name": "begin",
"do": "getList"
}],
"pattern": "combining",
"strategy": "",
"alarm_content": "钉钉告知用户xxxxxxxx,xxxx",
"alarm_url": "www.aaa.com",
"create_time": 1589373560798
}
多条件 :
flink_cep_groovy_aviator2 ---> dev.BroadcastCepDev2
多条件(对应mysql表 id =1):
{
"rule_id": 1,
"title": "三个条件,用户刷单告警_03",
"label_content": [{
"condition_name": "begin",
"times": "",
"sequence": "next('middle')",
"description": "第一个条件",
"condition_value": "operation_type=='1'&&app_code =='100002'"
}, {
"condition_name": "middle",
"description": "第二个条件",
"sequence": "next('end')",
"condition_value": "operation_type=='1'&&string.contains(operation_type_desc,'胡飞成功抢单')"
}, {
"condition_name": "end",
"within": "within(Time.days(1L))",
"description": "第三个条件",
"condition_value": "operation_type=='1'&&device_type =='4'"
}],
"select": [{
"condition_name": "begin",
"do": "getList"
}, {
"condition_name": "middle",
"do": "get"
}, {
"condition_name": "end",
"do": "get"
}],
"pattern": "combining",
"strategy": "AfterMatchSkipStrategy.skipToFirst('begin')",
"alarm_content": "钉钉告知用户xxxxxxxx,xxxx",
"alarm_url": "www.aaa.com",
"create_time": 1589373560798
}
条件麻烦一点的:
flink_cep_groovy_aviator2 ---> dev.BroadcastCepDev3
{
"rule_id": 4,
"title": "两个条件,用户刷单告警_03",
"label_content": [{
"condition_name": "begin",
"sequence": "next('middle')",
"description": "第一个条件",
"condition_value": "operation_type=='1'&&app_code =='100002'"
}, {
"condition_name": "middle",
"description": "第二个条件",
"sequence": "",
"condition_value": "operation_type=='1'&&string.contains(operation_type_desc,'成功抢单')&&(middle_create_date-begin_create_date) <3600000"
}],
"select": [{
"condition_name": "begin",
"do": "getList"
}, {
"condition_name": "middle",
"do": "get"
}],
"time_column": "create_date",
"strategy": "",
"alarm_content": "钉钉告知用户xxxxxxxx,xxxx",
"alarm_url": "www.aaa.com",
"create_time": 1589373560798
}
集群运行效果
还需要完善的就是:各条件之间有关联关系处理
比如:相邻数据时间差小于5秒,规则定义是难点,传参 "time_sub":"a-b < 5"
具体代码
flink_cep_groovy_aviator项目
相关代码
package com.sjb.test.yarn;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.sjb.constant.Constants;
import com.sjb.function.CommonFunction;
import com.sjb.schema.CustomerDeserializationSchema;
import com.sjb.test.HbaseWriter;
import com.sjb.test.pattern.PatternExample;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static com.sjb.test.jdbc.JdbcSelect.queryParam;
//todo com.sjb.test.yarn.FlinkDynamicCepDemo_2021_04_07_003
public class FlinkDynamicCepDemo_2021_04_07_003 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//todo 查询mysql 得到结果
ParameterTool parameterTool = ParameterTool.fromArgs(args);
long timestampParam = parameterTool.getLong("timestamp", 0L);
String checkPointParam = parameterTool.get("checkpoint", "");
String patternParam = parameterTool.get("prod", "");
// int paramId = parameterTool.getInt("id");
int paramId = 1;
String param = queryParam(paramId);
System.out.println("param = " + param);
Properties consumerProperties = getConsumerProperties();
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer(
"cep_test",
new CustomerDeserializationSchema(),
consumerProperties);
//todo 策略模式
kafkaConsumer.setStartFromEarliest();
DataStreamSource sourceStream = env.addSource(kafkaConsumer);
KeyedStream stream = sourceStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(5)) {
@Override
public long extractTimestamp(JSonObject loginEvent) {
return loginEvent.getLong("create_date");
}
}).keyBy(new KeySelector() {
@Override
public String getKey(JSonObject dataJson) throws Exception {
return dataJson.getString("device_code");
}
});
JSonObject paramJson = JSON.parseObject(param);
JSonArray selectArray = paramJson.getJSonArray("select");
String labelContent = paramJson.getString("label_content");
String strategy = paramJson.getString("strategy");
String title = paramJson.getString("title");
//todo 传入的数据应该是一个字符串
System.out.println("paramJson = " + paramJson);
//todo 打印数据:
// sourceStream.print("==>");
//todo 传入参数,返回一个 Pattern对象
Pattern pattern = new PatternExample(paramJson).pattern();
try {
PatternStream patternStream = CEP.pattern(stream, pattern);
//todo 这里要不要做动态生成
SingleOutputStreamOperator select = patternStream.flatSelect(new PatternFlatSelectFunction>() {
@Override
public void flatSelect(Map> map, Collector> out) throws Exception {
if (selectArray.size() > 1) {
List list = new ArrayList<>();
for (int i = 0; i < selectArray.size(); i++) {
JSonObject select = selectArray.getJSonObject(i);
String conditionName = select.getString("condition_name");
List begin = map.get(conditionName);
list.addAll(begin);
}
out.collect(list);
} else {
JSonObject select = selectArray.getJSonObject(0);
String conditionName = select.getString("condition_name");
List rs = map.get(conditionName);
out.collect(rs);
}
}
});
select.print("out输出:");
select.addSink(new HbaseWriter());
//todo 侧流输出
} catch (Exception e) {
e.printStackTrace();
}
try {
env.execute(title);
} catch (Exception e) {
e.printStackTrace();
}
}
public static Properties getConsumerProperties() {
Properties properties = new Properties();
properties.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);
properties.put("group.id", Constants.GROUP_ID);
properties.setProperty("flink.partition-discovery.interval-millis", 300 * 1000 + ""); // 自动发现消费的partition变化
return properties;
}
public static Properties getSinkProperties() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);
props.setProperty("buffer.memory", Constants.BUFFER_MEMORY);
props.setProperty("batch.size", Constants.BATCH_SIZE);
props.setProperty("linger.ms", Constants.LINGER_MS);
props.setProperty("max.request.size", Constants.MAX_REQUEST_SIZE);
props.setProperty("acks", Constants.ACKS);
props.setProperty("retries", Constants.RETRIES);
props.setProperty("retry.backoff.ms", Constants.RETRY_BACKOFF_MS);
return props;
}
}
flink_cep_groovy_aviator2项目
相关代码
package dev;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.sjb.constant.Constants;
import com.sjb.function.CommonFunction;
import com.sjb.schema.CustomerDeserializationSchema;
import com.sjb.test.pattern.PatternExample;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.RichPatternFlatSelectFunction;
import org.apache.flink.cep.listener.CepListener;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static com.sjb.test.jdbc.JdbcSelect.queryParam;
//todo test.BroadcastCepDev2
public class BroadcastCepDev2 {
private static String logStr = "{"operation_type":"1","order_number":"1","type":"in","ip":"深圳","device_code":"a","create_date":"1616653174000"}";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//todo 修改为eventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
ParameterTool parameterTool = ParameterTool.fromArgs(args);
//todo 传入mysql对应的规则id
// String ruleId = parameterTool.getRequired("rule_id");
// String ruleId = "2";
String ruleId = "1";
String ruleInfoStr = queryParam(ruleId);
System.out.println("打印从mysql查询出来的规则ruleInfoStr = " + ruleInfoStr);
JSonObject ruleJSON = JSON.parseObject(ruleInfoStr);
System.out.println("ruleJSON = " + ruleJSON);
//todo 读取kafka的数据
Properties consumerProperties = getConsumerProperties();
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer(
"flink_cep",
new CustomerDeserializationSchema(),
consumerProperties);
//todo 策略模式
kafkaConsumer.setStartFromEarliest();
DataStreamSource sourceStream = env.addSource(kafkaConsumer);
KeyedStream keyedWatermarkStream = sourceStream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(5L)) {
@Override
public long extractTimestamp(JSonObject element) {
return element.getLong("create_date");
}
}).map(new RichMapFunction() {
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public JSonObject map(JSonObject dataJson) throws Exception {
Long middleDate = dataJson.getLong("create_date");
String dateStr = format.format(middleDate);
dataJson.put("dateStr",dateStr);
return dataJson;
}
}).keyBy(new KeySelector() {
@Override
public String getKey(JSonObject dataJson) throws Exception {
return dataJson.getString("device_code");
}
});
//sourceStream.print("主数据流:");
MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("register", Types.STRING, Types.STRING);
FlinkKafkaConsumer broadcastKafkaConsumer = new FlinkKafkaConsumer(
"cep.fengkong.test",
new CustomerDeserializationSchema(),
consumerProperties);
//todo 策略模式
broadcastKafkaConsumer.setStartFromEarliest();
//todo 这里要修改成kafka的binglog数据
DataStreamSource dataStreamSource = env.addSource(broadcastKafkaConsumer);
DataStream ruleStream = dataStreamSource.flatMap(new RichFlatMapFunction() {
@Override
public void flatMap(JSonObject data, Collector out) throws Exception {
JSonArray dataJSonArray = data.getJSonArray("data");
for (int i = 0; i < dataJSONArray.size(); i++) {
JSonObject json = dataJSONArray.getJSonObject(i);
out.collect(json);
}
}
});
// ruleStream.print("--->");
BroadcastStream broadcastKafkaStream = ruleStream.broadcast(mapStateDescriptor);
// //todo 这里要修改成kafka的binglog数据
DataStream connectStream = keyedWatermarkStream
.connect(broadcastKafkaStream)
.process(new KeyedBroadcastProcessFunction() {
private MapStateDescriptor mapStateDescriptor2;
private Boolean notSend = true;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 这里需要初始化map state 描述
mapStateDescriptor2 = new MapStateDescriptor("register", Types.STRING, Types.STRING);
}
@Override
public void processElement(JSonObject datalog, ReadonlyContext ctx, Collector out) throws Exception {
//todo 处理主流数据
// System.out.println("处理主流数据");
ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(mapStateDescriptor2);
//todo 将广播数据发送到数据流里面去
if (notSend && broadcastState.contains(ruleId)) {
String broadcastStatevalue = broadcastState.get(ruleId);
notSend = false;
JSonObject broadcastStateJson = JSON.parseObject(broadcastStatevalue);
broadcastStateJson.put("device_code",datalog.getString("device_code"));
out.collect(broadcastStateJson);
}
out.collect(datalog);
}
@Override
public void processBroadcastElement(JSonObject value, Context ctx, Collector collector) throws Exception {
//todo 处理广播流
System.out.println("新增加需要监控的" + value.toJSonString());
BroadcastState broadcastState = ctx.getBroadcastState(mapStateDescriptor2);
boolean exist = broadcastState.contains(value.getString("id"));
if (exist) {
notSend = true;
}
broadcastState.put(value.getString("id"), value.toJSonString());
}
}).keyBy(new KeySelector() {
@Override
public String getKey(JSonObject dataJson) throws Exception {
return dataJson.getString("device_code");
}
});
// connectStream.print("==---->");
//todo 在这里添加动态生成代码
Pattern pattern = new PatternExample(ruleJSON).pattern();
PatternStream patternStream = CEP.pattern(connectStream, pattern);
PatternStream patternStream1 = patternStream.registerListener(new CepListener() {
@Override
public void init() {
System.out.println("初始化。。。");
}
@Override
public Boolean needChange(JSonObject dataJson) {
if (dataJson.containsKey("id") && dataJson.containsKey("name") && dataJson.containsKey("content")) {
System.out.println("这条数据是规则数据:" + dataJson);
return true;
}
return false;
}
@Override
public Pattern returnPattern(JSonObject dataJson) throws Exception {
JSonObject ruleJson = dataJson.getJSonObject("content");
System.out.println("接收到规则数据数据:" + ruleJson + ",切换逻辑");
Pattern pattern = new PatternExample(ruleJson).pattern();
return pattern;
}
});
//todo 这里有问题 就是没办法改动select条件,目前select条件一旦定了就不能修改了 。
patternStream1.flatSelect(new RichPatternFlatSelectFunction>() {
@Override
public void flatSelect(Map> map, Collector> out) throws Exception {
// JSonArray updateSelect = map.get("begin").get(0).getJSonArray("select");
String selectStr1 = CommonFunction.selectStr;
JSonArray updateSelect = JSON.parseArray(selectStr1);
if (updateSelect.size() > 1) {
List list = new ArrayList<>();
for (int i = 0; i < updateSelect.size(); i++) {
JSonObject select = updateSelect.getJSonObject(i);
String conditionName = select.getString("condition_name");
// String getListOrget = select.getString("do");
List begin = map.get(conditionName);
list.addAll(begin);
}
out.collect(list);
} else {
JSonObject select = updateSelect.getJSonObject(0);
String conditionName = select.getString("condition_name");
List rs = map.get(conditionName);
out.collect(rs);
}
}
}).print("out输出==>");
env.execute("xxxxx");
}
public static Properties getConsumerProperties() {
Properties properties = new Properties();
properties.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);
properties.put("group.id", Constants.GROUP_ID);
properties.setProperty("flink.partition-discovery.interval-millis", 300 * 1000 + ""); // 自动发现消费的partition变化
return properties;
}
}
参考文章
Apache Flink 1.12 documentation: FlinkCEP - Flink的复杂事件处理
google aviator:轻量级Java公式引擎_刘本龙的专栏-CSDN博客_java 公式引擎
flink cep pattern动态加载_帆了个帆的专栏-CSDN博客
最后
有评论说广播流那里可以改成阿波罗或者nacos 其实都一样,不过使用阿波罗或者nacos 一定要注意依赖冲突
如果有兴趣的老铁可以一键三连,然后评论留言继续交流,私信之前请一键三连。



