栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink官方示例分析-基于 DataStream API 实现欺诈检测

flink官方示例分析-基于 DataStream API 实现欺诈检测

看到这篇文章,应该都是看过flink官网的官方示例,这里就不作详细解释,只对其中的代码细节补充个人理解。如果还没看过,可以先看官方示例。

官方示例
api

pom文件

    
        1.8
        1.12.0
        2.11
        2.11.12
    

   
            org.apache.flink
            flink-walkthrough-common_${scala.binary.version}
            ${flink.version}
        

        
        
        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            ${flink.version}
            compile
        
        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
            compile
        

FraudDetectionJob.java

public class FraudDetectionJob {
    public static void main(String[] args) throws Exception {
        //设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //定义数据源
        DataStream transactions = env
                .addSource(new TransactionSource())
                .name("transactions");
        //算子 也就是欺诈检测的业务逻辑 FraudDetector() 去实现flink预定义的function接口
        DataStream alerts = transactions
                .keyBy(Transaction::getAccountId)
                .process(new FraudDetector())
                .name("fraud-detector");
        //结果输出,可以自定义输出类,继承SinkFunction 可以将数据写入不同外部系统,比如数据库,消息队列
        alerts
                .addSink(new alertSink())
                .name("send-alerts");
        //运行作业
        env.execute("Fraud Detection");
    }
}

FraudDetector.java

//Long:统计的维度, Transaction:传入的对象类型, alert:返回的对象类型
public class FraudDetector extends KeyedProcessFunction {

    private static final long serialVersionUID = 1L;

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;
    //状态标记 在这个业务逻辑中指的是当出现一笔小于1美元的交易,就标记这个状态
    private transient ValueState flagState;

    private transient ValueState timerState;

    
    @Override
    public void open(Configuration parameters) {
        //初始化flagState状态
        ValueStateDescriptor flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);
        //初始化timerState状态
        ValueStateDescriptor timerDescriptor = new ValueStateDescriptor<>(
                "timer-state",
                Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

    
    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector collector) throws Exception {

        //当前key可以理解为sql的group by 字段 这个值得是银行账号
        //
        Boolean lastTransactionWasSmall = flagState.value();

        if (lastTransactionWasSmall != null) {
            //如果本次转账记录大于500,返回一条欺诈交易记录,检测到欺诈交易
            if (transaction.getAmount() > LARGE_AMOUNT) {
                //Output an alert downstream
                alert alert = new alert();
                alert.setId(transaction.getAccountId());

                collector.collect(alert);
            }
            // 清楚标记
            cleanUp(context);
        }
        //如果本次转账记录小于1,设置标记字段,用来告诉下一条记录上一条记录是触发欺诈交易的前置条件
        if (transaction.getAmount() < SMALL_AMOUNT) {
            // set the flag to true
            flagState.update(true);
            // 当定时器触发时,将会调用 KeyedProcessFunction#onTimer 方法。 通过重写这个方法来实现一个你自己的重置状态的回调逻辑。
            //当两笔交易的时间超过1分钟,就不属于欺诈交易
            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
            context.timerService().registerProcessingTimeTimer(timer);

            timerState.update(timer);
        }
    }

    
    @Override
    public void onTimer(long timestamp, onTimerContext ctx, Collector out) {
        // remove flag after 1 minute
        timerState.clear();
        flagState.clear();
    }

    private void cleanUp(Context ctx) throws Exception {
        // 如果需要删除定时器
        //Long timer = timerState.value();
        //ctx.timerService().deleteProcessingTimeTimer(timer);

        // 清除状态
        timerState.clear();
        flagState.clear();
    }
}

运行结果

21:41:39.475 [fraud-detector -> Sink: send-alerts (13/16)#0] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager - Requesting subpartition 12 of PipelinedResultPartition 1892964827cbb9f48fac50c7ec72bba7#0@5f82a70ee35273132d759b1c0be2c934 [PIPELINED_BOUNDED, 16 subpartitions, 16 pending consumptions].
21:41:39.475 [fraud-detector -> Sink: send-alerts (13/16)#0] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: transactions (1/1)#0 (5f82a70ee35273132d759b1c0be2c934): Creating read view for subpartition 12 of partition 1892964827cbb9f48fac50c7ec72bba7#0@5f82a70ee35273132d759b1c0be2c934.
21:41:39.475 [fraud-detector -> Sink: send-alerts (13/16)#0] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - Created PipelinedSubpartitionView(index: 12) of ResultPartition 1892964827cbb9f48fac50c7ec72bba7#0@5f82a70ee35273132d759b1c0be2c934
21:41:42.567 [fraud-detector -> Sink: send-alerts (15/16)#0] INFO org.apache.flink.walkthrough.common.sink.alertSink - alert{id=3}
21:41:48.108 [fraud-detector -> Sink: send-alerts (15/16)#0] INFO org.apache.flink.walkthrough.common.sink.alertSink - alert{id=3}

官方示例没有注释删除定时器的代码#1、#2,所以在调试的时候断点不会进入#onTimer 方法。onTimer定时器的触发需要先调用到context.timerService().registerProcessingTimeTimer(timer);

private void cleanUp(Context ctx) throws Exception {
        // 如果需要删除定时器
      #1  Long timer = timerState.value();
      #2  ctx.timerService().deleteProcessingTimeTimer(timer);

        // 清除状态
      #3  timerState.clear();
      #4  flagState.clear();
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758135.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号