看到这篇文章,应该都是看过flink官网的官方示例,这里就不作详细解释,只对其中的代码细节补充个人理解。如果还没看过,可以先看官方示例。
官方示例
api
pom文件
1.8 1.12.0 2.11 2.11.12 org.apache.flink flink-walkthrough-common_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} compile org.apache.flink flink-clients_${scala.binary.version} ${flink.version} compile
FraudDetectionJob.java
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
//设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//定义数据源
DataStream transactions = env
.addSource(new TransactionSource())
.name("transactions");
//算子 也就是欺诈检测的业务逻辑 FraudDetector() 去实现flink预定义的function接口
DataStream alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
//结果输出,可以自定义输出类,继承SinkFunction 可以将数据写入不同外部系统,比如数据库,消息队列
alerts
.addSink(new alertSink())
.name("send-alerts");
//运行作业
env.execute("Fraud Detection");
}
}
FraudDetector.java
//Long:统计的维度, Transaction:传入的对象类型, alert:返回的对象类型 public class FraudDetector extends KeyedProcessFunction{ private static final long serialVersionUID = 1L; private static final double SMALL_AMOUNT = 1.00; private static final double LARGE_AMOUNT = 500.00; private static final long ONE_MINUTE = 60 * 1000; //状态标记 在这个业务逻辑中指的是当出现一笔小于1美元的交易,就标记这个状态 private transient ValueState flagState; private transient ValueState timerState; @Override public void open(Configuration parameters) { //初始化flagState状态 ValueStateDescriptor flagDescriptor = new ValueStateDescriptor<>( "flag", Types.BOOLEAN); flagState = getRuntimeContext().getState(flagDescriptor); //初始化timerState状态 ValueStateDescriptor timerDescriptor = new ValueStateDescriptor<>( "timer-state", Types.LONG); timerState = getRuntimeContext().getState(timerDescriptor); } @Override public void processElement( Transaction transaction, Context context, Collector collector) throws Exception { //当前key可以理解为sql的group by 字段 这个值得是银行账号 // Boolean lastTransactionWasSmall = flagState.value(); if (lastTransactionWasSmall != null) { //如果本次转账记录大于500,返回一条欺诈交易记录,检测到欺诈交易 if (transaction.getAmount() > LARGE_AMOUNT) { //Output an alert downstream alert alert = new alert(); alert.setId(transaction.getAccountId()); collector.collect(alert); } // 清楚标记 cleanUp(context); } //如果本次转账记录小于1,设置标记字段,用来告诉下一条记录上一条记录是触发欺诈交易的前置条件 if (transaction.getAmount() < SMALL_AMOUNT) { // set the flag to true flagState.update(true); // 当定时器触发时,将会调用 KeyedProcessFunction#onTimer 方法。 通过重写这个方法来实现一个你自己的重置状态的回调逻辑。 //当两笔交易的时间超过1分钟,就不属于欺诈交易 long timer = context.timerService().currentProcessingTime() + ONE_MINUTE; context.timerService().registerProcessingTimeTimer(timer); timerState.update(timer); } } @Override public void onTimer(long timestamp, onTimerContext ctx, Collector out) { // remove flag after 1 minute timerState.clear(); flagState.clear(); } private void cleanUp(Context ctx) throws Exception { // 如果需要删除定时器 //Long timer = timerState.value(); //ctx.timerService().deleteProcessingTimeTimer(timer); // 清除状态 timerState.clear(); flagState.clear(); } }
运行结果
21:41:39.475 [fraud-detector -> Sink: send-alerts (13/16)#0] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager - Requesting subpartition 12 of PipelinedResultPartition 1892964827cbb9f48fac50c7ec72bba7#0@5f82a70ee35273132d759b1c0be2c934 [PIPELINED_BOUNDED, 16 subpartitions, 16 pending consumptions].
21:41:39.475 [fraud-detector -> Sink: send-alerts (13/16)#0] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: transactions (1/1)#0 (5f82a70ee35273132d759b1c0be2c934): Creating read view for subpartition 12 of partition 1892964827cbb9f48fac50c7ec72bba7#0@5f82a70ee35273132d759b1c0be2c934.
21:41:39.475 [fraud-detector -> Sink: send-alerts (13/16)#0] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - Created PipelinedSubpartitionView(index: 12) of ResultPartition 1892964827cbb9f48fac50c7ec72bba7#0@5f82a70ee35273132d759b1c0be2c934
21:41:42.567 [fraud-detector -> Sink: send-alerts (15/16)#0] INFO org.apache.flink.walkthrough.common.sink.alertSink - alert{id=3}
21:41:48.108 [fraud-detector -> Sink: send-alerts (15/16)#0] INFO org.apache.flink.walkthrough.common.sink.alertSink - alert{id=3}
官方示例没有注释删除定时器的代码#1、#2,所以在调试的时候断点不会进入#onTimer 方法。onTimer定时器的触发需要先调用到context.timerService().registerProcessingTimeTimer(timer);
private void cleanUp(Context ctx) throws Exception {
// 如果需要删除定时器
#1 Long timer = timerState.value();
#2 ctx.timerService().deleteProcessingTimeTimer(timer);
// 清除状态
#3 timerState.clear();
#4 flagState.clear();
}



