栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink的CEP编程之CEP案例(找到哪些用户名是恶意登录)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink的CEP编程之CEP案例(找到哪些用户名是恶意登录)

需求:

从一堆的登录日志中,匹配一个恶意登录的模式(如果一个用户连续失败三次, 则是恶意登录),从而找到哪些用户名是恶意登录。

package cep

import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

import java.util


case class LoginEvent(id:Long,name:String,eventType:String,eventTime:Long)
object TestCEPByLogin {
  def main(args: Array[String]): Unit = {
    val streamEvn = StreamExecutionEnvironment.getExecutionEnvironment
    streamEvn.setParallelism(1)
    streamEvn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设置时间语义
    import org.apache.flink.streaming.api.scala._
    //1.输入事件流的创建
    //读取登录日志
    val stream: DataStream[LoginEvent] = streamEvn.fromCollection(List(
      new LoginEvent(1, "yqq", "fail", 1577080451),//这里单位秒
      new LoginEvent(2, "yqq", "fail", 1577080452),
      new LoginEvent(3, "yqq", "fail", 1577080453),
      new LoginEvent(4, "zifan", "fail", 1577080459),
      new LoginEvent(4, "zifan", "success", 1577080460),
      new LoginEvent(5, "yqq", "fail", 1577080463)
    )).assignAscendingTimestamps(_.eventTime*1000) //指定EventTime的时候必须确保到时间戳(毫秒)
    //2.定义模式(Pattern)
    val pattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("start").where(_.eventType.equals("fail"))
      .next("fail2").where(_.eventType.equals("fail"))
      .next("fail3").where(_.eventType.equals("fail"))
      .within(Time.seconds(10)) //时间限制,
    //3.检测Pattern
    val patternStream: PatternStream[LoginEvent] = CEP.pattern(stream.keyBy(_.name), pattern) //根据用户名分组
    //4.选择结果并输出
    val result: DataStream[String] = patternStream.select(new PatternSelectFunction[LoginEvent, String] {
      override def select(map: util.Map[String, util.List[LoginEvent]]): String = {
        val keyIter: util.Iterator[String] = map.keySet().iterator()
        val e1: LoginEvent = map.get(keyIter.next()).iterator().next()
        val e2: LoginEvent = map.get(keyIter.next()).iterator().next()
        val e3: LoginEvent = map.get(keyIter.next()).iterator().next()
        "用户名:" + e1.name + "登录时间" + ":" + e1.eventTime + ":" + e2.eventTime + ":" + e3.eventTime
      }
    })
    result.print()
    streamEvn.execute()
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/684905.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号