栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink 1.10.1 cep java版本案例(登录连续失败检测)

flink 1.10.1 cep java版本案例(登录连续失败检测)

 本文的基础环境可以参考flink 1.10.1 java版本wordcount演示 (nc + socket)

cep作为复杂事件模式检测技术,可以用于检测系统是否存在暴力破解攻击。暴力破解攻击的一个最明显特征就是同一账户在短期内出现大量的登录失败尝试,通过判断一段时间内的同一用户的连续登录失败次数是否大于某一特定数值,可以作为判断是否存在攻击的参考。

1. 添加相关依赖
      
          org.apache.flink
          flink-java
          ${flink.version}
      
      
          org.apache.flink
          flink-streaming-java_2.11
          ${flink.version}
      
      
          org.apache.flink
          flink-cep_2.11
          ${flink.version}
      
2. 程序代码
package com.demo.cep;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;


public class FlinkCEPLoginCheckDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(1000);

        //输入数据,提取事件时间
        KeyedStream loginEventKeyedStream = env.fromElements(
                new LoginEvent(1, "success", 1575600181000L),
                new LoginEvent(2, "fail1", 1575600182000L),
                new LoginEvent(2, "fail2", 1575600183000L),
                new LoginEvent(3, "fail1", 1575600184000L),
                new LoginEvent(3, "fail2", 1575600189000L)
        ).assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(5)) {
                    @Override
                    public long extractTimestamp(LoginEvent loginEvent) {
                        return loginEvent.getEventTime();
                    }
                }       )
                .keyBy(new KeySelector() {
                    @Override
                    public Integer getKey(LoginEvent loginEvent) throws Exception {
                        return loginEvent.getUserId();
                    }
                });

        //定义Pattern
        Pattern pattern = Pattern.begin("begin").where(new SimpleCondition() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return "fail1".equals(loginEvent.getEventType());
            }
        }).next("next").where(new SimpleCondition() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return "fail2".equals(loginEvent.getEventType());
            }
        }).within(Time.seconds(5));

        //检测模式
        PatternStream patternStream = CEP.pattern(loginEventKeyedStream, pattern);

        //侧输出标志
        OutputTag outputTag = new OutputTag("timeout") {};

//        //process方式提取数据
//        SingleOutputStreamOperator process = patternStream.process(new MyPatternProcessFunction(outputTag));
//        process.print("process login failed twice");
//        //提取超时数据
//        process.getSideOutput(outputTag).print("process timeout");

        //select方式提取数据
        SingleOutputStreamOperator outputStreamOperator = patternStream
                .select(
                        outputTag,
                        new PatternTimeoutFunction() {
                    @Override
                    public LoginEvent timeout(Map> map, long l) throws Exception {

                        return map.get("begin").iterator().next();
                    }
                },
                    new PatternSelectFunction() {
                    @Override
                    public Warning select(Map> map) throws Exception {
                        LoginEvent begin = map.get("begin").iterator().next();
                        LoginEvent next = map.get("next").iterator().next();

                        return new Warning(begin.getUserId(), begin.getEventTime(), next.getEventTime(), "Login failed twice");
                    }
                });

        // 提取超时的数据
        DataStream timeoutDataStream = outputStreamOperator.getSideOutput(outputTag);
        timeoutDataStream.print("timeout");

        // 提取匹配数据
        outputStreamOperator.print("Login failed twice");


        env.execute();
    }

    //
    static class MyPatternProcessFunction extends PatternProcessFunction implements TimedOutPartialMatchHandler {
        private OutputTag outputTag;

        public MyPatternProcessFunction(OutputTag outputTag) {
            this.outputTag = outputTag;
        }

        @Override
        public void processMatch(Map> map, Context context, Collector collector) throws Exception {
            LoginEvent begin = map.get("begin").iterator().next();
            LoginEvent next = map.get("next").iterator().next();

            collector.collect(new Warning(begin.getUserId(), begin.getEventTime(), next.getEventTime(), "Login failed twice"));
        }

        @Override
        public void processTimedOutMatch(Map> map, Context context) throws Exception {
            context.output(outputTag,map.get("begin").iterator().next());
        }
    }

}

有两种方式获取输出结果,一种是使用select方式,另外一种是采用process方式。

通过env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)和assignTimestampsAndWatermarks设置了watermark。

这里通过within(Time.seconds(5))设定时间范围为5秒内,检查符合第一次失败和第二次失败的用户登录。

3. 辅助代码LoginEvent
package com.demo.cep;

public class LoginEvent {
    private Integer userId;
    private String eventType;
    private Long eventTime;

    public LoginEvent() {
    }

    public LoginEvent(Integer userId, String eventType, Long eventTime) {
        this.userId = userId;
        this.eventType = eventType;
        this.eventTime = eventTime;
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getEventType() {
        return eventType;
    }

    public void setEventType(String eventType) {
        this.eventType = eventType;
    }

    public Long getEventTime() {
        return eventTime;
    }

    public void setEventTime(Long eventTime) {
        this.eventTime = eventTime;
    }

    @Override
    public String toString() {
        return "LoginEvent{" +
                "userId=" + userId +
                ", eventType='" + eventType + ''' +
                ", eventTime=" + eventTime +
                '}';
    }
}
4. 辅助代码Warning
package com.demo.cep;

public class Warning {

    private Integer userId;
    private Long startTime;
    private Long endTime;
    private String desc;

    public Warning() {
    }

    public Warning(Integer userId, Long startTime, Long endTime, String desc) {
        this.userId = userId;
        this.startTime = startTime;
        this.endTime = endTime;
        this.desc = desc;
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public Long getStartTime() {
        return startTime;
    }

    public void setStartTime(Long startTime) {
        this.startTime = startTime;
    }

    public Long getEndTime() {
        return endTime;
    }

    public void setEndTime(Long endTime) {
        this.endTime = endTime;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString() {
        return "Warning{" +
                "userId=" + userId +
                ", startTime=" + startTime +
                ", endTime=" + endTime +
                ", desc='" + desc + ''' +
                '}';
    }
}
5. 执行程序
Login failed twice:8> Warning{userId=2, startTime=1575600182000, endTime=1575600183000, desc='Login failed twice'}
timeout:8> LoginEvent{userId=3, eventType='fail1', eventTime=1575600184000}

这里的超时是指,找到了符合模式开始的时间,但是,并没有在限定时间范围内找到符合模式第二个条件的事件,这样就形成了一个超时记录。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735474.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号